From c5420fb7ecb4385e2884587aaed9292f6a7224c0 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Fri, 10 Nov 2023 15:49:34 +0100 Subject: [PATCH 01/22] feat: implement DB get schema (w/ endpoint) --- front/lib/core_api.ts | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/front/lib/core_api.ts b/front/lib/core_api.ts index c8b8134d699f..e939bc0ccb4b 100644 --- a/front/lib/core_api.ts +++ b/front/lib/core_api.ts @@ -1030,7 +1030,7 @@ export const CoreAPI = { projectId: string; dataSourceName: string; databaseId: string; - tableId: string; + tableId?: string | null; limit: number; offset: number; }): Promise< @@ -1040,9 +1040,39 @@ export const CoreAPI = { limit: number; total: number; }> + > { + let url = `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/rows?limit=${limit}&offset=${offset}`; + if (tableId) { + url += `&table_id=${tableId}`; + } + const response = await fetch(url, { + method: "GET", + }); + + return _resultFromResponse(response); + }, + + async getDatabaseSchema({ + projectId, + dataSourceName, + databaseId, + }: { + projectId: string; + dataSourceName: string; + databaseId: string; + }): Promise< + CoreAPIResponse<{ + schema: Record< + string, + { + table: CoreAPIDatabaseTable; + schema: Record; + } + >; + }> > { const response = await fetch( - `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/tables/${tableId}/rows?limit=${limit}&offset=${offset}`, + `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/schema`, { method: "GET", } From 7e4633c09a9a5b13381cf71c486a47e59dab9aec Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Fri, 10 Nov 2023 18:02:38 +0100 Subject: [PATCH 02/22] database row list remove support for cross table --- front/lib/core_api.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/front/lib/core_api.ts b/front/lib/core_api.ts index e939bc0ccb4b..7da021e17a7c 100644 --- a/front/lib/core_api.ts +++ b/front/lib/core_api.ts @@ -1030,7 +1030,7 @@ export const CoreAPI = { projectId: string; dataSourceName: string; databaseId: string; - tableId?: string | null; + tableId: string; limit: number; offset: number; }): Promise< @@ -1041,13 +1041,12 @@ export const CoreAPI = { total: number; }> > { - let url = `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/rows?limit=${limit}&offset=${offset}`; - if (tableId) { - url += `&table_id=${tableId}`; - } - const response = await fetch(url, { - method: "GET", - }); + const response = await fetch( + `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/tables/${tableId}/rows?limit=${limit}&offset=${offset}`, + { + method: "GET", + } + ); return _resultFromResponse(response); }, From f91dd28c508816334ceb8f488d8218fd150ecfa8 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 12:21:13 +0100 Subject: [PATCH 03/22] scaffolding --- core/bin/dust_api.rs | 61 ++++++++++- core/src/databases/database.rs | 112 +++++++++++++++++++- core/src/databases/table_schema.rs | 158 +++++++++++++++++++++++++++-- front/lib/core_api.ts | 23 +++-- 4 files changed, 332 insertions(+), 22 deletions(-) diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index b4aa2e7ade43..8f4bf5dcbe8e 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2032,14 +2032,14 @@ async fn databases_schema_retrieve( &format!("No database found for id `{}`", database_id), None, ), - Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await { + Ok(Some(db)) => match db.get_schema(&project, state.store.clone(), false).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", "Failed to retrieve database schema", Some(e), ), - Ok(schema) => ( + Ok((schema, _)) => ( StatusCode::OK, Json(APIResponse { error: None, @@ -2052,6 +2052,59 @@ async fn databases_schema_retrieve( } } +#[derive(serde::Deserialize)] +struct DatabaseQueryRunPayload { + query: String, +} + +async fn databases_query_run( + extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>, + extract::Json(payload): extract::Json, + extract::Extension(state): extract::Extension>, +) -> (StatusCode, Json) { + let project = project::Project::new_from_id(project_id); + + match state + .store + .load_database(&project, &data_source_id, &database_id) + .await + { + Err(e) => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "Failed to retrieve database", + Some(e), + ), + Ok(None) => error_response( + StatusCode::NOT_FOUND, + "database_not_found", + &format!("No database found for id `{}`", database_id), + None, + ), + Ok(Some(db)) => match db + .query(&project, state.store.clone(), &payload.query) + .await + { + Err(e) => error_response( + StatusCode::INTERNAL_SERVER_ERROR, + "internal_server_error", + "Failed to run query", + Some(e), + ), + Ok((rows, schema)) => ( + StatusCode::OK, + Json(APIResponse { + error: None, + response: Some(json!({ + "schema": schema, + "rows": rows, + })), + }), + ), + }, + } +} + // Misc #[derive(serde::Deserialize)] @@ -2273,6 +2326,10 @@ fn main() { "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/schema", get(databases_schema_retrieve), ) + .route( + "/projects/:project_id/data_sources/:data_source_id/databases/:database_id/query", + post(databases_query_run), + ) // Misc .route("/tokenize", post(tokenize)) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 0cb6704c4a9f..29dbb433f0b6 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Result}; use crate::{project::Project, stores::store::Store}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Number, Value}; use std::collections::HashMap; use super::table_schema::TableSchema; @@ -48,7 +48,8 @@ impl Database { &self, project: &Project, store: Box, - ) -> Result { + return_rows: bool, + ) -> Result<(DatabaseSchema, Option>>)> { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { @@ -97,6 +98,94 @@ impl Database { } } + pub async fn query( + &self, + project: &Project, + store: Box, + query: &str, + ) -> Result<(Vec, TableSchema)> { + match self.db_type { + DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), + DatabaseType::LOCAL => { + // Retrieve the DB schema and construct a SQL string. + let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; + let mut create_tables_sql = "".to_string(); + // TODO: maybe we can // ? + let mut table_schemas = HashMap::new(); + for (table_name, table) in schema.into_iter() { + if table.schema.is_empty() { + continue; + } + table_schemas.insert(table_name.clone(), table.schema.clone()); + create_tables_sql += &table + .schema + .get_create_table_sql_string(table_name.as_str()); + create_tables_sql += "\n"; + } + + // Build the in-memory SQLite DB with the schema. + let conn = rusqlite::Connection::open_in_memory()?; + conn.execute_batch(&create_tables_sql)?; + + let mut stmt = conn.prepare(query)?; + + let column_names = stmt + .column_names() + .into_iter() + .map(|x| x.to_string()) + .collect::>(); + let column_count = stmt.column_count(); + + // insert the rows in the DB + for (table_name, rows) in rows_by_table.expect("No rows found") { + if rows.is_empty() { + continue; + } + + let table_schema = table_schemas + .get(&table_name) + .expect("No schema found for table"); + + let mut insert_sql = "".to_string(); + for row in rows { + let insert_row_sql = + table_schema.get_insert_row_sql_string(&table_name, row.content())?; + insert_sql += &insert_row_sql; + } + conn.execute_batch(&insert_sql)?; + } + + let rows = stmt.query_map([], |row| { + let mut map = serde_json::Map::new(); + for i in 0..column_count { + let column_name = column_names.get(i).expect("Invalid column name"); + let value = match row.get(i).expect("Invalid value") { + rusqlite::types::Value::Integer(i) => Value::Number(i.into()), + rusqlite::types::Value::Real(f) => { + Value::Number(Number::from_f64(f).expect("invalid float value")) + } + rusqlite::types::Value::Text(t) => Value::String(t), + // convert blob into string + rusqlite::types::Value::Blob(b) => { + Value::String(String::from_utf8(b).expect("Invalid UTF-8 sequence")) + } + + rusqlite::types::Value::Null => Value::Null, + }; + map.insert(column_name.to_string(), value); + } + Ok(Value::Object(map)) + })?; + + let results = rows.collect::, rusqlite::Error>>()?; + let results_refs = results.iter().collect::>(); + let table_schema = TableSchema::from_rows(&results_refs)?; + + Ok((results, table_schema)) + } + } + } + // Getters pub fn created(&self) -> u64 { self.created @@ -182,7 +271,7 @@ impl DatabaseRow { } #[derive(Debug, Serialize)] -struct DatabaseSchemaTable { +pub struct DatabaseSchemaTable { table: DatabaseTable, schema: TableSchema, } @@ -191,7 +280,24 @@ impl DatabaseSchemaTable { pub fn new(table: DatabaseTable, schema: TableSchema) -> Self { DatabaseSchemaTable { table, schema } } + + pub fn table(&self) -> &DatabaseTable { + &self.table + } + + pub fn is_empty(&self) -> bool { + self.schema.is_empty() + } } #[derive(Debug, Serialize)] pub struct DatabaseSchema(HashMap); + +impl IntoIterator for DatabaseSchema { + type Item = (String, DatabaseSchemaTable); + type IntoIter = std::collections::hash_map::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index 61ed3089f2a8..8838bf7128c5 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -23,6 +23,9 @@ impl TableSchema { pub fn empty() -> Self { Self(HashMap::new()) } + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } pub fn from_rows(rows: &Vec) -> Result { let mut schema = HashMap::new(); @@ -72,8 +75,8 @@ impl TableSchema { Ok(Self(schema)) } - pub fn to_sql_string(&self, table_name: &str) -> String { - let mut create_table = format!("CREATE TABLE {} (", table_name); + pub fn get_create_table_sql_string(&self, table_name: &str) -> String { + let mut create_table = format!("CREATE TABLE \"{}\" (", table_name); for (name, field_type) in &self.0 { let sql_type = match field_type { @@ -83,7 +86,7 @@ impl TableSchema { TableSchemaFieldType::Bool => "BOOLEAN", }; - create_table.push_str(&format!("{} {}, ", name, sql_type)); + create_table.push_str(&format!("\"{}\" {}, ", name, sql_type)); } // Remove the trailing comma and space, then close the parentheses. @@ -93,6 +96,56 @@ impl TableSchema { create_table } + + pub fn get_insert_row_sql_string( + &self, + table_name: &str, + row_content: &Value, + ) -> Result { + let row_content = row_content + .as_object() + .ok_or_else(|| anyhow!("Row content is not an object"))?; + + let mut insert_row = format!("INSERT INTO \"{}\" (", table_name); + + for (name, _) in &self.0 { + insert_row.push_str(&format!("\"{}\", ", name)); + } + + // Remove the trailing comma and space, then close the parentheses. + let len = insert_row.len(); + insert_row.truncate(len - 2); + insert_row.push_str(") VALUES ("); + + for (name, _) in &self.0 { + let value = row_content.get(name); + + // if the value is not present, it's a null value + let value = value.unwrap_or(&Value::Null); + + let sql_value = match value { + Value::Null => "NULL".to_string(), + Value::Bool(x) => x.to_string(), + Value::Number(x) => x.to_string(), + Value::String(x) => format!("\"{}\"", x), + Value::Object(_) | Value::Array(_) => { + return Err(anyhow!( + "Row content field {} is not a primitive type", + name + )) + } + }; + + insert_row.push_str(&format!("{}, ", sql_value)); + } + + // Remove the trailing comma and space, then close the parentheses. + let len = insert_row.len(); + insert_row.truncate(len - 2); + insert_row.push_str(");"); + + Ok(insert_row) + } } #[cfg(test)] @@ -193,7 +246,7 @@ mod tests { } #[test] - fn test_table_schema_to_string() -> Result<()> { + fn test_table_schema_get_create_table_sql_string() -> Result<()> { let schema_map: HashMap = [ ("field1", TableSchemaFieldType::Int), ("field2", TableSchemaFieldType::Float), @@ -206,9 +259,7 @@ mod tests { let schema = TableSchema(schema_map); - let sql = schema.to_sql_string("test_table"); - - println!("{}", sql); + let sql = schema.get_create_table_sql_string("test_table"); let conn = Connection::open_in_memory()?; conn.execute(&sql, [])?; @@ -229,4 +280,97 @@ mod tests { Ok(()) } + + #[test] + fn test_get_insert_row_sql_string_success() -> Result<()> { + let schema = create_test_schema()?; + let conn = setup_in_memory_db(&schema)?; + + let row_content = json!({ + "field1": 1, + "field2": 2.4, + "field3": "text", + "field4": true + }); + + let sql_string = schema.get_insert_row_sql_string("test_table", &row_content)?; + conn.execute(&sql_string, [])?; + + let mut stmt = conn.prepare("SELECT * FROM test_table;")?; + let mut rows = stmt.query([])?; + + if let Some(row) = rows.next()? { + let field1: i64 = row.get("field1")?; + let field2: f64 = row.get("field2")?; + let field3: String = row.get("field3")?; + let field4: bool = row.get("field4")?; + + assert_eq!(field1, 1); + assert_eq!(field2, 2.4); + assert_eq!(field3, "text"); + assert_eq!(field4, true); + } else { + return Err(anyhow!("No rows found after insert")); + } + + Ok(()) + } + + #[test] + fn test_get_insert_row_sql_string_missing_fields() -> Result<()> { + let schema = create_test_schema()?; + let conn = setup_in_memory_db(&schema)?; + + let row_content = json!({ + "field1": 1, + "field2": 2.4 + // Missing field3 and field4 + }); + + let sql_string = schema.get_insert_row_sql_string("test_table", &row_content)?; + conn.execute(&sql_string, [])?; + + let mut stmt = conn.prepare("SELECT * FROM test_table;")?; + let mut rows = stmt.query([])?; + + if let Some(row) = rows.next()? { + let field1: i64 = row.get("field1")?; + let field2: f64 = row.get("field2")?; + let field3: Option = row.get("field3")?; + let field4: Option = row.get("field4")?; + + assert_eq!(field1, 1); + assert_eq!(field2, 2.4); + assert_eq!(field3, None); + assert_eq!(field4, None); + } else { + return Err(anyhow!("No rows found after insert")); + } + + Ok(()) + } + + // Helper function to create a test schema + fn create_test_schema() -> Result { + let schema_map: HashMap = [ + ("field1", TableSchemaFieldType::Int), + ("field2", TableSchemaFieldType::Float), + ("field3", TableSchemaFieldType::Text), + ("field4", TableSchemaFieldType::Bool), + ] + .iter() + .cloned() + .map(|(field_id, field_type)| (field_id.to_string(), field_type)) + .collect(); + + Ok(TableSchema(schema_map)) + } + + // Helper function to set up an in-memory database with a test table + fn setup_in_memory_db(schema: &TableSchema) -> Result { + let conn = Connection::open_in_memory()?; + let sql_create_table = schema.get_create_table_sql_string("test_table"); + conn.execute(&sql_create_table, [])?; + Ok(conn) + } } diff --git a/front/lib/core_api.ts b/front/lib/core_api.ts index 7da021e17a7c..7e2e3029d429 100644 --- a/front/lib/core_api.ts +++ b/front/lib/core_api.ts @@ -1080,29 +1080,32 @@ export const CoreAPI = { return _resultFromResponse(response); }, - async getDatabaseSchema({ + async queryDatabase({ projectId, dataSourceName, databaseId, + query, }: { projectId: string; dataSourceName: string; databaseId: string; + query: string; }): Promise< CoreAPIResponse<{ - schema: Record< - string, - { - table: CoreAPIDatabaseTable; - schema: Record; - } - >; + schema: Record; + rows: Record[]; }> > { const response = await fetch( - `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/schema`, + `${CORE_API}/projects/${projectId}/data_sources/${dataSourceName}/databases/${databaseId}/query`, { - method: "GET", + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + query: query, + }), } ); From bd61cf193430c6151b7c9d8894322dad6683977c Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 16:06:58 +0100 Subject: [PATCH 04/22] remove another for loop --- core/src/databases/database.rs | 39 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 29dbb433f0b6..f5b08cf6dbf9 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -109,19 +109,23 @@ impl Database { DatabaseType::LOCAL => { // Retrieve the DB schema and construct a SQL string. let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; - let mut create_tables_sql = "".to_string(); - // TODO: maybe we can // ? - let mut table_schemas = HashMap::new(); - for (table_name, table) in schema.into_iter() { - if table.schema.is_empty() { - continue; - } - table_schemas.insert(table_name.clone(), table.schema.clone()); - create_tables_sql += &table - .schema - .get_create_table_sql_string(table_name.as_str()); - create_tables_sql += "\n"; - } + + let table_schemas: HashMap = schema + .iter() + .filter(|(_, table)| !table.schema.is_empty()) + .map(|(table_name, table)| (table_name.clone(), table.schema.clone())) + .collect(); + + let create_tables_sql: String = schema + .iter() + .filter(|(_, table)| !table.schema.is_empty()) + .map(|(table_name, table)| { + table + .schema + .get_create_table_sql_string(table_name.as_str()) + }) + .collect::>() + .join("\n"); // Build the in-memory SQLite DB with the schema. let conn = rusqlite::Connection::open_in_memory()?; @@ -293,11 +297,8 @@ impl DatabaseSchemaTable { #[derive(Debug, Serialize)] pub struct DatabaseSchema(HashMap); -impl IntoIterator for DatabaseSchema { - type Item = (String, DatabaseSchemaTable); - type IntoIter = std::collections::hash_map::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() +impl DatabaseSchema { + pub fn iter(&self) -> std::collections::hash_map::Iter { + self.0.iter() } } From 4fdfc43e958123884bce8ba1d2bbc9738e1f9d76 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 16:19:02 +0100 Subject: [PATCH 05/22] add some DSSTRUCTSTAT --- core/src/databases/database.rs | 40 +++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index f5b08cf6dbf9..1811d025eeab 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; -use crate::{project::Project, stores::store::Store}; +use crate::{project::Project, stores::store::Store, utils}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use serde_json::{Number, Value}; @@ -107,8 +107,13 @@ impl Database { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { + let time_query_start = utils::now(); // Retrieve the DB schema and construct a SQL string. let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; + utils::done(&format!( + "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", + utils::now() - time_query_start + )); let table_schemas: HashMap = schema .iter() @@ -116,6 +121,7 @@ impl Database { .map(|(table_name, table)| (table_name.clone(), table.schema.clone())) .collect(); + let generate_create_table_sql_start = utils::now(); let create_tables_sql: String = schema .iter() .filter(|(_, table)| !table.schema.is_empty()) @@ -126,10 +132,20 @@ impl Database { }) .collect::>() .join("\n"); + utils::done(&format!( + "DSSTRUCTSTAT Finished generating create table SQL: duration={}ms", + utils::now() - generate_create_table_sql_start + )); // Build the in-memory SQLite DB with the schema. let conn = rusqlite::Connection::open_in_memory()?; + + let create_tables_execute_start = utils::now(); conn.execute_batch(&create_tables_sql)?; + utils::done(&format!( + "DSSTRUCTSTAT Finished creating tables: duration={}ms", + utils::now() - create_tables_execute_start + )); let mut stmt = conn.prepare(query)?; @@ -141,6 +157,7 @@ impl Database { let column_count = stmt.column_count(); // insert the rows in the DB + let insert_execute_start = utils::now(); for (table_name, rows) in rows_by_table.expect("No rows found") { if rows.is_empty() { continue; @@ -156,9 +173,15 @@ impl Database { table_schema.get_insert_row_sql_string(&table_name, row.content())?; insert_sql += &insert_row_sql; } + conn.execute_batch(&insert_sql)?; } + utils::done(&format!( + "DSSTRUCTSTAT Finished inserting rows: duration={}ms", + utils::now() - insert_execute_start + )); + let user_query_execute_start = utils::now(); let rows = stmt.query_map([], |row| { let mut map = serde_json::Map::new(); for i in 0..column_count { @@ -180,10 +203,25 @@ impl Database { } Ok(Value::Object(map)) })?; + utils::done(&format!( + "DSSTRUCTSTAT Finished executing user query: duration={}ms", + utils::now() - user_query_execute_start + )); let results = rows.collect::, rusqlite::Error>>()?; let results_refs = results.iter().collect::>(); + + let infer_result_schema_start = utils::now(); let table_schema = TableSchema::from_rows(&results_refs)?; + utils::done(&format!( + "DSSTRUCTSTAT Finished inferring schema: duration={}ms", + utils::now() - infer_result_schema_start + )); + + utils::done(&format!( + "DSSTRUCTSTAT Finished query database: duration={}ms", + utils::now() - time_query_start + )); Ok((results, table_schema)) } From 6504eb8a1cc1c6b6ed0440d0ba0674791e379d13 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 16:50:46 +0100 Subject: [PATCH 06/22] don't panic --- core/src/databases/database.rs | 74 +++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 1811d025eeab..f8fc45c65987 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Result}; use crate::{project::Project, stores::store::Store, utils}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; -use serde_json::{Number, Value}; +use serde_json::Value; use std::collections::HashMap; use super::table_schema::TableSchema; @@ -110,6 +110,11 @@ impl Database { let time_query_start = utils::now(); // Retrieve the DB schema and construct a SQL string. let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; + let rows_by_table = match rows_by_table { + Some(rows) => rows, + None => return Err(anyhow!("No rows found")), + }; + utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_query_start @@ -158,14 +163,14 @@ impl Database { // insert the rows in the DB let insert_execute_start = utils::now(); - for (table_name, rows) in rows_by_table.expect("No rows found") { + for (table_name, rows) in rows_by_table { if rows.is_empty() { continue; } let table_schema = table_schemas .get(&table_name) - .expect("No schema found for table"); + .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; let mut insert_sql = "".to_string(); for row in rows { @@ -182,33 +187,54 @@ impl Database { )); let user_query_execute_start = utils::now(); - let rows = stmt.query_map([], |row| { - let mut map = serde_json::Map::new(); - for i in 0..column_count { - let column_name = column_names.get(i).expect("Invalid column name"); - let value = match row.get(i).expect("Invalid value") { - rusqlite::types::Value::Integer(i) => Value::Number(i.into()), - rusqlite::types::Value::Real(f) => { - Value::Number(Number::from_f64(f).expect("invalid float value")) - } - rusqlite::types::Value::Text(t) => Value::String(t), - // convert blob into string - rusqlite::types::Value::Blob(b) => { - Value::String(String::from_utf8(b).expect("Invalid UTF-8 sequence")) - } - - rusqlite::types::Value::Null => Value::Null, - }; - map.insert(column_name.to_string(), value); - } - Ok(Value::Object(map)) + // Execute the query and get an iterator over the mapped rows. + let mapped_rows = stmt.query_map([], |row| { + (0..column_count) + .map(|i| row.get::(i)) + .collect::, rusqlite::Error>>() })?; + + let results = mapped_rows.map(|row_result| { + row_result + .map_err(|e| anyhow!("Failed to retrieve a row: {}", e)) + .and_then(|row| { + column_names.iter().enumerate().try_fold( + serde_json::Map::new(), + |mut acc, (i, column_name)| { + row.get(i) + .ok_or_else(|| { + anyhow!("Missing value at index {} for column {}", i, column_name) + }) + .and_then(|sql_value| { + let json_value = match sql_value { + rusqlite::types::Value::Integer(i) => serde_json::Value::Number((*i).into()), + rusqlite::types::Value::Real(f) => serde_json::Number::from_f64(*f) + .ok_or_else(|| { + anyhow!("Invalid float value for column {}", column_name) + }) + .map(serde_json::Value::Number)?, + rusqlite::types::Value::Text(t) => serde_json::Value::String(t.clone()), + rusqlite::types::Value::Blob(b) => String::from_utf8(b.clone()) + .map_err(|_| { + anyhow!("Invalid UTF-8 sequence for column {}", column_name) + }) + .map(serde_json::Value::String)?, + rusqlite::types::Value::Null => serde_json::Value::Null, + }; + acc.insert(column_name.clone(), json_value); + Ok(acc) + }) + }, + ) + .map(serde_json::Value::Object) + }) + }) + .collect::, anyhow::Error>>()?; utils::done(&format!( "DSSTRUCTSTAT Finished executing user query: duration={}ms", utils::now() - user_query_execute_start )); - let results = rows.collect::, rusqlite::Error>>()?; let results_refs = results.iter().collect::>(); let infer_result_schema_start = utils::now(); From 0bb795c16a87924afef0585875582f31bae7e06f Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 17:47:05 +0100 Subject: [PATCH 07/22] Refactor database insertions to use prepared statements --- core/src/databases/database.rs | 14 +++-- core/src/databases/table_schema.rs | 97 ++++++++++++++++-------------- 2 files changed, 62 insertions(+), 49 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index f8fc45c65987..f47107d083f7 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Result}; +use rusqlite::ToSql; use crate::{project::Project, stores::store::Store, utils}; use rayon::prelude::*; @@ -172,14 +173,17 @@ impl Database { .get(&table_name) .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; - let mut insert_sql = "".to_string(); for row in rows { - let insert_row_sql = + let (query, boxed_params) = table_schema.get_insert_row_sql_string(&table_name, row.content())?; - insert_sql += &insert_row_sql; - } - conn.execute_batch(&insert_sql)?; + let params_refs: Vec<&dyn ToSql> = boxed_params + .iter() + .map(|param| &**param as &dyn ToSql) + .collect(); + + conn.execute(&query, params_refs.as_slice())?; + } } utils::done(&format!( "DSSTRUCTSTAT Finished inserting rows: duration={}ms", diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index 8838bf7128c5..c6e7aa3b0937 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; +use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -101,50 +102,45 @@ impl TableSchema { &self, table_name: &str, row_content: &Value, - ) -> Result { - let row_content = row_content + ) -> Result<(String, Vec>)> { + let row_content_map = row_content .as_object() .ok_or_else(|| anyhow!("Row content is not an object"))?; - let mut insert_row = format!("INSERT INTO \"{}\" (", table_name); - - for (name, _) in &self.0 { - insert_row.push_str(&format!("\"{}\", ", name)); - } - - // Remove the trailing comma and space, then close the parentheses. - let len = insert_row.len(); - insert_row.truncate(len - 2); - insert_row.push_str(") VALUES ("); - - for (name, _) in &self.0 { - let value = row_content.get(name); - - // if the value is not present, it's a null value - let value = value.unwrap_or(&Value::Null); - - let sql_value = match value { - Value::Null => "NULL".to_string(), - Value::Bool(x) => x.to_string(), - Value::Number(x) => x.to_string(), - Value::String(x) => format!("\"{}\"", x), - Value::Object(_) | Value::Array(_) => { - return Err(anyhow!( - "Row content field {} is not a primitive type", - name - )) - } - }; - - insert_row.push_str(&format!("{}, ", sql_value)); - } - - // Remove the trailing comma and space, then close the parentheses. - let len = insert_row.len(); - insert_row.truncate(len - 2); - insert_row.push_str(");"); + let field_names: Vec<&String> = self.0.keys().collect(); + let fields = field_names + .iter() + .map(|name| format!("\"{}\"", name)) + .collect::>() + .join(", "); + let placeholders = field_names + .iter() + .enumerate() + .map(|(i, _)| format!("?{}", i + 1)) + .collect::>() + .join(", "); + + let params: Vec> = field_names + .iter() + .map(|name| match row_content_map.get(*name) { + Some(Value::Bool(b)) => Ok(Box::new(*b) as Box), + Some(Value::Number(n)) => n + .as_i64() + .map(|i| Box::new(i) as Box) + .or_else(|| n.as_f64().map(|f| Box::new(f) as Box)) + .ok_or_else(|| anyhow!("Invalid number value for field {}", name)), + Some(Value::String(s)) => Ok(Box::new(s.clone()) as Box), + Some(Value::Null) | None => Ok(Box::new(rusqlite::types::Null) as Box), + Some(_) => Err(anyhow!("Unsupported value type for field {}", name)), + }) + .collect::>>>()?; + + let insert_row = format!( + "INSERT INTO \"{}\" ({}) VALUES ({});", + table_name, fields, placeholders + ); - Ok(insert_row) + Ok((insert_row, params)) } } @@ -293,8 +289,15 @@ mod tests { "field4": true }); - let sql_string = schema.get_insert_row_sql_string("test_table", &row_content)?; - conn.execute(&sql_string, [])?; + let (sql_string, boxed_params) = + schema.get_insert_row_sql_string("test_table", &row_content)?; + + let params_refs: Vec<&dyn ToSql> = boxed_params + .iter() + .map(|param| &**param as &dyn ToSql) + .collect(); + + conn.execute(&sql_string, params_refs.as_slice())?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; @@ -327,8 +330,14 @@ mod tests { // Missing field3 and field4 }); - let sql_string = schema.get_insert_row_sql_string("test_table", &row_content)?; - conn.execute(&sql_string, [])?; + let (sql_string, boxed_params) = + schema.get_insert_row_sql_string("test_table", &row_content)?; + let params_refs: Vec<&dyn ToSql> = boxed_params + .iter() + .map(|param| &**param as &dyn ToSql) + .collect(); + + conn.execute(&sql_string, params_refs.as_slice())?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; From 3092df08954f12b5b0ade8007ad0db12404a6d91 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Mon, 13 Nov 2023 17:51:13 +0100 Subject: [PATCH 08/22] aesthetic nit --- core/src/databases/database.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index f47107d083f7..6cea6e6b55cd 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -115,7 +115,6 @@ impl Database { Some(rows) => rows, None => return Err(anyhow!("No rows found")), }; - utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_query_start From ab048dfcf0899fa1408e59d01159fa1495657d57 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Tue, 14 Nov 2023 17:59:22 +0100 Subject: [PATCH 09/22] fix rebase --- core/src/databases/database.rs | 47 +++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 6cea6e6b55cd..a94c89a8dae2 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -85,15 +85,28 @@ impl Database { .into_iter() .collect::>(); - Ok(DatabaseSchema( - rows.into_par_iter() - .map(|(table, rows)| { - Ok(( - table.table_id().to_string(), - DatabaseSchemaTable::new(table, TableSchema::from_rows(&rows)?), - )) - }) - .collect::>>()?, + let returned_rows = match return_rows { + true => Some( + rows.clone() + .into_iter() + .map(|(table, rows)| (table.table_id().to_string(), rows)) + .collect::>(), + ), + false => None, + }; + + Ok(( + DatabaseSchema( + rows.into_par_iter() + .map(|(table, r)| { + Ok(( + table.table_id().to_string(), + DatabaseSchemaTable::new(table, TableSchema::from_rows(&r)?), + )) + }) + .collect::>>()?, + ), + returned_rows, )) } } @@ -104,7 +117,7 @@ impl Database { project: &Project, store: Box, query: &str, - ) -> Result<(Vec, TableSchema)> { + ) -> Result<(Vec, TableSchema)> { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { @@ -238,10 +251,12 @@ impl Database { utils::now() - user_query_execute_start )); - let results_refs = results.iter().collect::>(); - let infer_result_schema_start = utils::now(); - let table_schema = TableSchema::from_rows(&results_refs)?; + let result_rows = results + .into_par_iter() + .map(|v| DatabaseRow::new(utils::now(), None, &v)) + .collect::>(); + let table_schema = TableSchema::from_rows(&result_rows)?; utils::done(&format!( "DSSTRUCTSTAT Finished inferring schema: duration={}ms", utils::now() - infer_result_schema_start @@ -252,7 +267,7 @@ impl Database { utils::now() - time_query_start )); - Ok((results, table_schema)) + Ok((result_rows, table_schema)) } } } @@ -352,10 +367,6 @@ impl DatabaseSchemaTable { DatabaseSchemaTable { table, schema } } - pub fn table(&self) -> &DatabaseTable { - &self.table - } - pub fn is_empty(&self) -> bool { self.schema.is_empty() } From e8c33e9bee8228c1ee8036139d9e68506ded948e Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Tue, 14 Nov 2023 19:06:50 +0100 Subject: [PATCH 10/22] spoluify the code a bit --- core/src/databases/database.rs | 156 ++++++++++++++++++--------------- 1 file changed, 86 insertions(+), 70 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index a94c89a8dae2..da66a3dba7f1 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,8 +1,7 @@ -use anyhow::{anyhow, Result}; -use rusqlite::ToSql; - use crate::{project::Project, stores::store::Store, utils}; +use anyhow::{anyhow, Result}; use rayon::prelude::*; +use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -176,86 +175,103 @@ impl Database { // insert the rows in the DB let insert_execute_start = utils::now(); - for (table_name, rows) in rows_by_table { - if rows.is_empty() { - continue; - } - - let table_schema = table_schemas - .get(&table_name) - .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; - - for row in rows { - let (query, boxed_params) = - table_schema.get_insert_row_sql_string(&table_name, row.content())?; - - let params_refs: Vec<&dyn ToSql> = boxed_params - .iter() - .map(|param| &**param as &dyn ToSql) - .collect(); - - conn.execute(&query, params_refs.as_slice())?; - } - } + rows_by_table + .iter() + .filter(|(_, rows)| !rows.is_empty()) + .map(|(table_name, rows)| { + let table_schema = table_schemas + .get(table_name) + .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; + + rows.iter() + .map(|row| { + match table_schema + .get_insert_row_sql_string(table_name, &row.content) + { + Ok((query, boxed_params)) => { + let params_refs: Vec<&dyn ToSql> = boxed_params + .iter() + .map(|param| &**param as &dyn ToSql) + .collect(); + + match conn.execute(&query, params_refs.as_slice()) { + Ok(res) => Ok(res), + Err(e) => Err(anyhow!("Error: {}", e)), + } + } + Err(e) => Err(anyhow!("Error: {}", e)), + } + }) + .collect::>>() + }) + .collect::>>()?; utils::done(&format!( "DSSTRUCTSTAT Finished inserting rows: duration={}ms", utils::now() - insert_execute_start )); let user_query_execute_start = utils::now(); - // Execute the query and get an iterator over the mapped rows. - let mapped_rows = stmt.query_map([], |row| { - (0..column_count) - .map(|i| row.get::(i)) - .collect::, rusqlite::Error>>() - })?; - - let results = mapped_rows.map(|row_result| { - row_result - .map_err(|e| anyhow!("Failed to retrieve a row: {}", e)) - .and_then(|row| { - column_names.iter().enumerate().try_fold( - serde_json::Map::new(), - |mut acc, (i, column_name)| { - row.get(i) - .ok_or_else(|| { - anyhow!("Missing value at index {} for column {}", i, column_name) - }) - .and_then(|sql_value| { - let json_value = match sql_value { - rusqlite::types::Value::Integer(i) => serde_json::Value::Number((*i).into()), - rusqlite::types::Value::Real(f) => serde_json::Number::from_f64(*f) - .ok_or_else(|| { - anyhow!("Invalid float value for column {}", column_name) - }) - .map(serde_json::Value::Number)?, - rusqlite::types::Value::Text(t) => serde_json::Value::String(t.clone()), - rusqlite::types::Value::Blob(b) => String::from_utf8(b.clone()) - .map_err(|_| { - anyhow!("Invalid UTF-8 sequence for column {}", column_name) - }) - .map(serde_json::Value::String)?, - rusqlite::types::Value::Null => serde_json::Value::Null, - }; - acc.insert(column_name.clone(), json_value); - Ok(acc) - }) - }, - ) - .map(serde_json::Value::Object) + // Execute the query and collect the results in a vector of serde_json::Value objects. + let result_rows = stmt + .query_and_then([], |row| { + column_names + .iter() + .enumerate() + .map(|(i, column_name)| { + Ok(( + column_name.clone(), + match row.get(i) { + Err(e) => { + return Err(anyhow!( + "Failed to retrieve value for column {}: {}", + column_name, + e + )) + } + Ok(v) => match v { + rusqlite::types::Value::Integer(i) => { + Ok(serde_json::Value::Number(i.into())) + } + rusqlite::types::Value::Real(f) => { + match serde_json::Number::from_f64(f) { + Some(n) => Ok(serde_json::Value::Number(n)), + None => Err(anyhow!( + "Invalid float value for column {}", + column_name + )), + } + } + rusqlite::types::Value::Text(t) => { + Ok(serde_json::Value::String(t.clone())) + } + rusqlite::types::Value::Blob(b) => { + match String::from_utf8(b.clone()) { + Err(_) => Err(anyhow!( + "Invalid UTF-8 sequence for column {}", + column_name + )), + Ok(s) => Ok(serde_json::Value::String(s)), + } + } + rusqlite::types::Value::Null => { + Ok(serde_json::Value::Null) + } + }, + }?, + )) }) - }) - .collect::, anyhow::Error>>()?; + .collect::>() + })? + .collect::>>()? + .into_par_iter() + .map(|v| DatabaseRow::new(utils::now(), None, &v)) + .collect::>(); utils::done(&format!( "DSSTRUCTSTAT Finished executing user query: duration={}ms", utils::now() - user_query_execute_start )); let infer_result_schema_start = utils::now(); - let result_rows = results - .into_par_iter() - .map(|v| DatabaseRow::new(utils::now(), None, &v)) - .collect::>(); let table_schema = TableSchema::from_rows(&result_rows)?; utils::done(&format!( "DSSTRUCTSTAT Finished inferring schema: duration={}ms", From 9e6e7a730b973834a5ff816270f771712d5ee8e1 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Tue, 14 Nov 2023 19:19:02 +0100 Subject: [PATCH 11/22] split in 2 functiong --- core/src/databases/database.rs | 58 ++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index da66a3dba7f1..59f0b7c4f6a4 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,7 +1,7 @@ use crate::{project::Project, stores::store::Store, utils}; use anyhow::{anyhow, Result}; use rayon::prelude::*; -use rusqlite::ToSql; +use rusqlite::{Connection, ToSql}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -111,17 +111,17 @@ impl Database { } } - pub async fn query( + pub async fn create_in_memory_sqlite_conn( &self, project: &Project, store: Box, - query: &str, - ) -> Result<(Vec, TableSchema)> { + ) -> Result { match self.db_type { - DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), + DatabaseType::REMOTE => Err(anyhow!( + "Cannot build an in-memory SQLite DB for a remote database." + )), DatabaseType::LOCAL => { - let time_query_start = utils::now(); - // Retrieve the DB schema and construct a SQL string. + let time_build_db_start = utils::now(); let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; let rows_by_table = match rows_by_table { Some(rows) => rows, @@ -129,7 +129,7 @@ impl Database { }; utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", - utils::now() - time_query_start + utils::now() - time_build_db_start )); let table_schemas: HashMap = schema @@ -154,7 +154,6 @@ impl Database { utils::now() - generate_create_table_sql_start )); - // Build the in-memory SQLite DB with the schema. let conn = rusqlite::Connection::open_in_memory()?; let create_tables_execute_start = utils::now(); @@ -164,16 +163,6 @@ impl Database { utils::now() - create_tables_execute_start )); - let mut stmt = conn.prepare(query)?; - - let column_names = stmt - .column_names() - .into_iter() - .map(|x| x.to_string()) - .collect::>(); - let column_count = stmt.column_count(); - - // insert the rows in the DB let insert_execute_start = utils::now(); rows_by_table .iter() @@ -210,7 +199,34 @@ impl Database { utils::now() - insert_execute_start )); - let user_query_execute_start = utils::now(); + Ok(conn) + } + } + } + + pub async fn query( + &self, + project: &Project, + store: Box, + query: &str, + ) -> Result<(Vec, TableSchema)> { + match self.db_type { + DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), + DatabaseType::LOCAL => { + let conn = self + .create_in_memory_sqlite_conn(project, store.clone()) + .await?; + + let time_query_start = utils::now(); + + let mut stmt = conn.prepare(query)?; + + let column_names = stmt + .column_names() + .into_iter() + .map(|x| x.to_string()) + .collect::>(); + // Execute the query and collect the results in a vector of serde_json::Value objects. let result_rows = stmt .query_and_then([], |row| { @@ -268,7 +284,7 @@ impl Database { .collect::>(); utils::done(&format!( "DSSTRUCTSTAT Finished executing user query: duration={}ms", - utils::now() - user_query_execute_start + utils::now() - time_query_start )); let infer_result_schema_start = utils::now(); From d4dd0faf60126f860b676e6398688643232bc01d Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Tue, 14 Nov 2023 21:15:58 +0100 Subject: [PATCH 12/22] simplify function in table schema --- core/src/databases/database.rs | 1 + core/src/databases/table_schema.rs | 36 ++++++++++++++---------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 59f0b7c4f6a4..21c280abd3aa 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -221,6 +221,7 @@ impl Database { let mut stmt = conn.prepare(query)?; + // copy the column names into a vector of strings let column_names = stmt .column_names() .into_iter() diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index c6e7aa3b0937..b9be07188e0f 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -77,25 +77,23 @@ impl TableSchema { } pub fn get_create_table_sql_string(&self, table_name: &str) -> String { - let mut create_table = format!("CREATE TABLE \"{}\" (", table_name); - - for (name, field_type) in &self.0 { - let sql_type = match field_type { - TableSchemaFieldType::Int => "INT", - TableSchemaFieldType::Float => "REAL", - TableSchemaFieldType::Text => "TEXT", - TableSchemaFieldType::Bool => "BOOLEAN", - }; - - create_table.push_str(&format!("\"{}\" {}, ", name, sql_type)); - } - - // Remove the trailing comma and space, then close the parentheses. - let len = create_table.len(); - create_table.truncate(len - 2); - create_table.push_str(");"); - - create_table + format!( + "CREATE TABLE \"{}\" ({})", + table_name, + self.0 + .iter() + .map(|(name, field_type)| { + let sql_type = match field_type { + TableSchemaFieldType::Int => "INT", + TableSchemaFieldType::Float => "REAL", + TableSchemaFieldType::Text => "TEXT", + TableSchemaFieldType::Bool => "BOOLEAN", + }; + format!("\"{}\" {}", name, sql_type) + }) + .collect::>() + .join(", ") + ) } pub fn get_insert_row_sql_string( From a23754fce9468c7c5cb55f6de9a1fae495399104 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 15 Nov 2023 15:27:13 +0100 Subject: [PATCH 13/22] blazingly faster --- core/Cargo.lock | 1 + core/Cargo.toml | 2 +- core/src/databases/database.rs | 50 ++++++++++------- core/src/databases/table_schema.rs | 89 +++++++++++------------------- 4 files changed, 65 insertions(+), 77 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 9752a5cb0409..46cac1c2d331 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2469,6 +2469,7 @@ dependencies = [ "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", + "serde_json", "smallvec", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 882aaa8aea05..6a15d625954b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,7 +30,7 @@ rand = "0.8" uuid = { version = "1.1", features = ["v4"] } parking_lot = "0.12" axum = "0.5" -rusqlite = { version = "0.29", features = ["bundled"] } +rusqlite = { version = "0.29", features = ["bundled", "serde_json"] } tokio-postgres = "0.7" bb8 = "0.8" bb8-postgres = "0.8" diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 21c280abd3aa..9ac22cf7c3a7 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -1,7 +1,8 @@ use crate::{project::Project, stores::store::Store, utils}; use anyhow::{anyhow, Result}; + use rayon::prelude::*; -use rusqlite::{Connection, ToSql}; +use rusqlite::{params_from_iter, Connection}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; @@ -164,6 +165,7 @@ impl Database { )); let insert_execute_start = utils::now(); + rows_by_table .iter() .filter(|(_, rows)| !rows.is_empty()) @@ -172,26 +174,34 @@ impl Database { .get(table_name) .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; - rows.iter() - .map(|row| { - match table_schema - .get_insert_row_sql_string(table_name, &row.content) - { - Ok((query, boxed_params)) => { - let params_refs: Vec<&dyn ToSql> = boxed_params - .iter() - .map(|param| &**param as &dyn ToSql) - .collect(); - - match conn.execute(&query, params_refs.as_slice()) { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("Error: {}", e)), - } - } - Err(e) => Err(anyhow!("Error: {}", e)), - } + let mut table_insert_sql_stmt = + conn.prepare(table_schema.get_insert_sql(table_name).as_str())?; + let column_names = table_insert_sql_stmt.column_names(); + + let param_values: Vec> = rows + .par_iter() + .map(|r| match r.content.as_object() { + None => Err(anyhow!("Row content is not an object")), + Some(object) => column_names + .iter() + .map(|col| match object.get(*col) { + Some(x) => Ok(x), + None => Ok(&Value::Null), + }) + .collect::>>(), }) - .collect::>>() + .collect::>>()?; + + param_values + .iter() + .map(|values| { + table_insert_sql_stmt + .execute(params_from_iter(values)) + .map_err(|e| anyhow!("Error inserting row: {}", e)) + }) + .collect::>>()?; + + Ok(()) }) .collect::>>()?; utils::done(&format!( diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index b9be07188e0f..42b458c8121b 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; -use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -96,49 +95,23 @@ impl TableSchema { ) } - pub fn get_insert_row_sql_string( - &self, - table_name: &str, - row_content: &Value, - ) -> Result<(String, Vec>)> { - let row_content_map = row_content - .as_object() - .ok_or_else(|| anyhow!("Row content is not an object"))?; - + pub fn get_insert_sql(&self, table_name: &str) -> String { let field_names: Vec<&String> = self.0.keys().collect(); - let fields = field_names - .iter() - .map(|name| format!("\"{}\"", name)) - .collect::>() - .join(", "); - let placeholders = field_names - .iter() - .enumerate() - .map(|(i, _)| format!("?{}", i + 1)) - .collect::>() - .join(", "); - - let params: Vec> = field_names - .iter() - .map(|name| match row_content_map.get(*name) { - Some(Value::Bool(b)) => Ok(Box::new(*b) as Box), - Some(Value::Number(n)) => n - .as_i64() - .map(|i| Box::new(i) as Box) - .or_else(|| n.as_f64().map(|f| Box::new(f) as Box)) - .ok_or_else(|| anyhow!("Invalid number value for field {}", name)), - Some(Value::String(s)) => Ok(Box::new(s.clone()) as Box), - Some(Value::Null) | None => Ok(Box::new(rusqlite::types::Null) as Box), - Some(_) => Err(anyhow!("Unsupported value type for field {}", name)), - }) - .collect::>>>()?; - - let insert_row = format!( + format!( "INSERT INTO \"{}\" ({}) VALUES ({});", - table_name, fields, placeholders - ); - - Ok((insert_row, params)) + table_name, + field_names + .iter() + .map(|name| format!("\"{}\"", name)) + .collect::>() + .join(", "), + field_names + .iter() + .enumerate() + .map(|(i, _)| format!("?{}", i + 1)) + .collect::>() + .join(", ") + ) } } @@ -146,7 +119,7 @@ impl TableSchema { mod tests { use super::*; use crate::utils; - use rusqlite::Connection; + use rusqlite::{params_from_iter, Connection}; use serde_json::json; #[test] @@ -287,15 +260,18 @@ mod tests { "field4": true }); - let (sql_string, boxed_params) = - schema.get_insert_row_sql_string("test_table", &row_content)?; + let sql = schema.get_insert_sql("test_table"); + let mut stmt = conn.prepare(&sql)?; + let content_object = row_content.as_object().unwrap(); - let params_refs: Vec<&dyn ToSql> = boxed_params - .iter() - .map(|param| &**param as &dyn ToSql) - .collect(); + stmt.execute(params_from_iter( + stmt.column_names() + .iter() + .map(|n| content_object.get(*n).unwrap().clone()) + .collect::>(), + ))?; - conn.execute(&sql_string, params_refs.as_slice())?; + // conn.execute(&sql_string, params_from_iter(row_content.as_object().))?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; @@ -328,14 +304,15 @@ mod tests { // Missing field3 and field4 }); - let (sql_string, boxed_params) = - schema.get_insert_row_sql_string("test_table", &row_content)?; - let params_refs: Vec<&dyn ToSql> = boxed_params + let mut insert_stmt = conn.prepare("test_table")?; + + let params = insert_stmt + .column_names() .iter() - .map(|param| &**param as &dyn ToSql) - .collect(); + .map(|n| row_content.as_object().unwrap().get(*n).unwrap().clone()) + .collect::>(); - conn.execute(&sql_string, params_refs.as_slice())?; + insert_stmt.execute(params_from_iter(params))?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; From 5ee076bcd6193dfa8ac2e5c76dd7db2a5c89f30c Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 15 Nov 2023 18:24:28 +0100 Subject: [PATCH 14/22] make everything work --- core/src/databases/database.rs | 24 ++----- core/src/databases/table_schema.rs | 104 +++++++++++++++++------------ 2 files changed, 69 insertions(+), 59 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 9ac22cf7c3a7..b31e43310327 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -173,30 +173,18 @@ impl Database { let table_schema = table_schemas .get(table_name) .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; + let (sql, field_names) = table_schema.get_insert_sql(table_name); + let mut stmt = conn.prepare(&sql)?; - let mut table_insert_sql_stmt = - conn.prepare(table_schema.get_insert_sql(table_name).as_str())?; - let column_names = table_insert_sql_stmt.column_names(); - - let param_values: Vec> = rows + let params: Vec> = rows .par_iter() - .map(|r| match r.content.as_object() { - None => Err(anyhow!("Row content is not an object")), - Some(object) => column_names - .iter() - .map(|col| match object.get(*col) { - Some(x) => Ok(x), - None => Ok(&Value::Null), - }) - .collect::>>(), - }) + .map(|r| table_schema.get_insert_params(&field_names, r)) .collect::>>()?; - param_values + params .iter() .map(|values| { - table_insert_sql_stmt - .execute(params_from_iter(values)) + stmt.execute(params_from_iter(values)) .map_err(|e| anyhow!("Error inserting row: {}", e)) }) .collect::>>()?; diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index 42b458c8121b..b2addb1b157f 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -95,24 +95,48 @@ impl TableSchema { ) } - pub fn get_insert_sql(&self, table_name: &str) -> String { + pub fn get_insert_sql(&self, table_name: &str) -> (String, Vec<&String>) { let field_names: Vec<&String> = self.0.keys().collect(); - format!( - "INSERT INTO \"{}\" ({}) VALUES ({});", - table_name, - field_names - .iter() - .map(|name| format!("\"{}\"", name)) - .collect::>() - .join(", "), - field_names - .iter() - .enumerate() - .map(|(i, _)| format!("?{}", i + 1)) - .collect::>() - .join(", ") + ( + format!( + "INSERT INTO \"{}\" ({}) VALUES ({});", + table_name, + field_names + .iter() + .map(|name| format!("\"{}\"", name)) + .collect::>() + .join(", "), + field_names + .iter() + .enumerate() + .map(|(i, _)| format!("?{}", i + 1)) + .collect::>() + .join(", ") + ), + field_names, ) } + + pub fn get_insert_params( + &self, + field_names: &Vec<&String>, + row: &DatabaseRow, + ) -> Result> { + match row.content().as_object() { + None => Err(anyhow!("Row content is not an object")), + Some(object) => field_names + .iter() + .map(|col| match object.get(*col) { + // convert bools to 1/0 for SQLite + Some(Value::Bool(true)) => Ok(1.into()), + Some(Value::Bool(false)) => Ok(0.into()), + Some(x) => Ok(x.clone()), + // if the field is missing, insert null + None => Ok(Value::Null), + }) + .collect::>>(), + } + } } #[cfg(test)] @@ -253,25 +277,25 @@ mod tests { let schema = create_test_schema()?; let conn = setup_in_memory_db(&schema)?; - let row_content = json!({ - "field1": 1, - "field2": 2.4, - "field3": "text", - "field4": true - }); + let row = DatabaseRow::new( + utils::now(), + None, + &json!({ + "field1": 1, + "field2": 2.4, + "field3": "text", + "field4": true + }), + ); - let sql = schema.get_insert_sql("test_table"); - let mut stmt = conn.prepare(&sql)?; - let content_object = row_content.as_object().unwrap(); + let (sql, field_names) = schema.get_insert_sql("test_table"); - stmt.execute(params_from_iter( - stmt.column_names() - .iter() - .map(|n| content_object.get(*n).unwrap().clone()) - .collect::>(), - ))?; + let params_vec = schema.get_insert_params(&field_names, &row)?; + println!("{:?}", params_vec); - // conn.execute(&sql_string, params_from_iter(row_content.as_object().))?; + let mut stmt = conn.prepare(&sql)?; + + stmt.execute(params_from_iter(params_vec))?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; @@ -284,7 +308,7 @@ mod tests { assert_eq!(field1, 1); assert_eq!(field2, 2.4); - assert_eq!(field3, "text"); + assert_eq!(field3, "\"text\""); assert_eq!(field4, true); } else { return Err(anyhow!("No rows found after insert")); @@ -304,15 +328,13 @@ mod tests { // Missing field3 and field4 }); - let mut insert_stmt = conn.prepare("test_table")?; - - let params = insert_stmt - .column_names() - .iter() - .map(|n| row_content.as_object().unwrap().get(*n).unwrap().clone()) - .collect::>(); - - insert_stmt.execute(params_from_iter(params))?; + let (sql, field_names) = schema.get_insert_sql("test_table"); + let params = params_from_iter(schema.get_insert_params( + &field_names, + &DatabaseRow::new(utils::now(), Some("1".to_string()), &row_content), + )?); + let mut stmt = conn.prepare(&sql)?; + stmt.execute(params)?; let mut stmt = conn.prepare("SELECT * FROM test_table;")?; let mut rows = stmt.query([])?; From 75ffdbbda1c72fadb5325158346c9926f9d37b0d Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 15 Nov 2023 18:31:00 +0100 Subject: [PATCH 15/22] more rustiness --- core/src/databases/database.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index b31e43310327..8daea4260613 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -165,14 +165,15 @@ impl Database { )); let insert_execute_start = utils::now(); - rows_by_table .iter() .filter(|(_, rows)| !rows.is_empty()) .map(|(table_name, rows)| { - let table_schema = table_schemas - .get(table_name) - .ok_or_else(|| anyhow!("No schema found for table {}", table_name))?; + let table_schema = match table_schemas.get(table_name) { + Some(s) => Ok(s), + None => Err(anyhow!("No schema found for table {}", table_name)), + }?; + let (sql, field_names) = table_schema.get_insert_sql(table_name); let mut stmt = conn.prepare(&sql)?; @@ -183,13 +184,11 @@ impl Database { params .iter() - .map(|values| { - stmt.execute(params_from_iter(values)) - .map_err(|e| anyhow!("Error inserting row: {}", e)) + .map(|values| match stmt.execute(params_from_iter(values)) { + Ok(_) => Ok(()), + Err(e) => Err(anyhow!("Error inserting row: {}", e)), }) - .collect::>>()?; - - Ok(()) + .collect::>>() }) .collect::>>()?; utils::done(&format!( From d93d04cec9a6d84e89a855e16d890218425f9acc Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Wed, 15 Nov 2023 18:38:10 +0100 Subject: [PATCH 16/22] don't clone rows --- core/src/databases/database.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 8daea4260613..0b82fe424c99 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -85,29 +85,26 @@ impl Database { .into_iter() .collect::>(); + let schema = rows + .par_iter() + .map(|(table, r)| { + Ok(( + table.table_id().to_string(), + DatabaseSchemaTable::new(table.clone(), TableSchema::from_rows(&r)?), + )) + }) + .collect::>>()?; + let returned_rows = match return_rows { true => Some( - rows.clone() - .into_iter() + rows.into_iter() .map(|(table, rows)| (table.table_id().to_string(), rows)) .collect::>(), ), false => None, }; - Ok(( - DatabaseSchema( - rows.into_par_iter() - .map(|(table, r)| { - Ok(( - table.table_id().to_string(), - DatabaseSchemaTable::new(table, TableSchema::from_rows(&r)?), - )) - }) - .collect::>>()?, - ), - returned_rows, - )) + Ok((DatabaseSchema(schema), returned_rows)) } } } From 2ee45d9a51a19d27c73fb8aaae7fa461f81555d5 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 11:27:19 +0100 Subject: [PATCH 17/22] use narrowed enum --- core/src/databases/database.rs | 4 +-- core/src/databases/table_schema.rs | 46 +++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 0b82fe424c99..1e76397ba489 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -use super::table_schema::TableSchema; +use super::table_schema::{SqlParam, TableSchema}; #[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)] #[serde(rename_all = "lowercase")] @@ -174,7 +174,7 @@ impl Database { let (sql, field_names) = table_schema.get_insert_sql(table_name); let mut stmt = conn.prepare(&sql)?; - let params: Vec> = rows + let params: Vec> = rows .par_iter() .map(|r| table_schema.get_insert_params(&field_names, r)) .collect::>>()?; diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index b2addb1b157f..3e19f84142e4 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; +use rusqlite::{types::ToSqlOutput, ToSql}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -16,6 +17,30 @@ pub enum TableSchemaFieldType { Bool, } +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub enum SqlParam { + Int(i64), + Float(f64), + Text(String), + Bool(bool), + Null, +} + +impl ToSql for SqlParam { + fn to_sql(&self) -> rusqlite::Result> { + match self { + SqlParam::Int(i) => i.to_sql(), + SqlParam::Float(f) => f.to_sql(), + SqlParam::Text(s) => Ok(ToSqlOutput::Owned(format!("\"{}\"", s).into())), + SqlParam::Bool(b) => match b { + true => 1.to_sql(), + false => 0.to_sql(), + }, + SqlParam::Null => Ok(ToSqlOutput::Owned(rusqlite::types::Value::Null)), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct TableSchema(HashMap); @@ -121,18 +146,25 @@ impl TableSchema { &self, field_names: &Vec<&String>, row: &DatabaseRow, - ) -> Result> { + ) -> Result> { match row.content().as_object() { None => Err(anyhow!("Row content is not an object")), Some(object) => field_names .iter() .map(|col| match object.get(*col) { - // convert bools to 1/0 for SQLite - Some(Value::Bool(true)) => Ok(1.into()), - Some(Value::Bool(false)) => Ok(0.into()), - Some(x) => Ok(x.clone()), - // if the field is missing, insert null - None => Ok(Value::Null), + Some(Value::Bool(b)) => Ok(SqlParam::Bool(*b)), + Some(Value::Number(x)) => { + if x.is_i64() { + Ok(SqlParam::Int(x.as_i64().unwrap())) + } else if x.is_f64() { + Ok(SqlParam::Float(x.as_f64().unwrap())) + } else { + Err(anyhow!("Number is not an i64 or f64")) + } + } + Some(Value::String(s)) => Ok(SqlParam::Text(s.clone())), + None | Some(Value::Null) => Ok(SqlParam::Null), + _ => Err(anyhow!("Cannot convert value {:?} to SqlParam", object)), }) .collect::>>(), } From 0240957f2a57bce50f38466b46648fc22e82200f Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 12:20:21 +0100 Subject: [PATCH 18/22] remove serde_json extension for rusqlite --- core/Cargo.lock | 1 - core/Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 46cac1c2d331..9752a5cb0409 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -2469,7 +2469,6 @@ dependencies = [ "fallible-streaming-iterator", "hashlink", "libsqlite3-sys", - "serde_json", "smallvec", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 6a15d625954b..882aaa8aea05 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -30,7 +30,7 @@ rand = "0.8" uuid = { version = "1.1", features = ["v4"] } parking_lot = "0.12" axum = "0.5" -rusqlite = { version = "0.29", features = ["bundled", "serde_json"] } +rusqlite = { version = "0.29", features = ["bundled"] } tokio-postgres = "0.7" bb8 = "0.8" bb8-postgres = "0.8" From cb241b186ebfec7a5f9221965a930f6f523c3e7b Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 12:22:57 +0100 Subject: [PATCH 19/22] remove extra variable --- core/bin/dust_api.rs | 4 +- core/src/databases/database.rs | 111 +++++++++++++++++---------------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index 8f4bf5dcbe8e..feb967f3bec6 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2032,14 +2032,14 @@ async fn databases_schema_retrieve( &format!("No database found for id `{}`", database_id), None, ), - Ok(Some(db)) => match db.get_schema(&project, state.store.clone(), false).await { + Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", "Failed to retrieve database schema", Some(e), ), - Ok((schema, _)) => ( + Ok(schema) => ( StatusCode::OK, Json(APIResponse { error: None, diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 1e76397ba489..a5d8eed54bd7 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -use super::table_schema::{SqlParam, TableSchema}; +use super::table_schema::TableSchema; #[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)] #[serde(rename_all = "lowercase")] @@ -49,41 +49,11 @@ impl Database { &self, project: &Project, store: Box, - return_rows: bool, - ) -> Result<(DatabaseSchema, Option>>)> { + ) -> Result { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { - let (tables, _) = store - .list_databases_tables(&project, &self.data_source_id, &self.database_id, None) - .await?; - - // Concurrently retrieve table rows. - let rows = futures::future::try_join_all( - tables - .into_iter() - .map(|table| { - let store = store.clone(); - - async move { - let (rows, _) = store - .list_database_rows( - project, - self.data_source_id.as_str(), - self.database_id.as_str(), - table.table_id(), - None, - ) - .await?; - - Ok::<_, anyhow::Error>((table, rows)) - } - }) - .collect::>(), - ) - .await? - .into_iter() - .collect::>(); + let rows = self.get_rows(project, store).await?; let schema = rows .par_iter() @@ -95,16 +65,7 @@ impl Database { }) .collect::>>()?; - let returned_rows = match return_rows { - true => Some( - rows.into_iter() - .map(|(table, rows)| (table.table_id().to_string(), rows)) - .collect::>(), - ), - false => None, - }; - - Ok((DatabaseSchema(schema), returned_rows)) + Ok(DatabaseSchema(schema)) } } } @@ -120,16 +81,26 @@ impl Database { )), DatabaseType::LOCAL => { let time_build_db_start = utils::now(); - let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; - let rows_by_table = match rows_by_table { - Some(rows) => rows, - None => return Err(anyhow!("No rows found")), - }; + + let schema = self.get_schema(project, store.clone()).await?; utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_build_db_start )); + let time_get_rows_start = utils::now(); + let rows_by_table = match self.get_rows(project, store.clone()).await { + Ok(rows) => Ok(rows + .into_iter() + .map(|(table, rows)| (table.table_id().to_string(), rows)) + .collect::>()), + _ => Err(anyhow!("Error retrieving rows from database.")), + }?; + utils::done(&format!( + "DSSTRUCTSTAT Finished retrieving rows: duration={}ms", + utils::now() - time_get_rows_start + )); + let table_schemas: HashMap = schema .iter() .filter(|(_, table)| !table.schema.is_empty()) @@ -174,12 +145,9 @@ impl Database { let (sql, field_names) = table_schema.get_insert_sql(table_name); let mut stmt = conn.prepare(&sql)?; - let params: Vec> = rows - .par_iter() + rows.par_iter() .map(|r| table_schema.get_insert_params(&field_names, r)) - .collect::>>()?; - - params + .collect::>>()? .iter() .map(|values| match stmt.execute(params_from_iter(values)) { Ok(_) => Ok(()), @@ -299,6 +267,43 @@ impl Database { } } + pub async fn get_rows( + &self, + project: &Project, + store: Box, + ) -> Result)>> { + let (tables, _) = store + .list_databases_tables(&project, &self.data_source_id, &self.database_id, None) + .await?; + + // Concurrently retrieve table rows. + Ok(futures::future::try_join_all( + tables + .into_iter() + .map(|table| { + let store = store.clone(); + + async move { + let (rows, _) = store + .list_database_rows( + project, + self.data_source_id.as_str(), + self.database_id.as_str(), + table.table_id(), + None, + ) + .await?; + + Ok::<_, anyhow::Error>((table, rows)) + } + }) + .collect::>(), + ) + .await? + .into_iter() + .collect::>()) + } + // Getters pub fn created(&self) -> u64 { self.created From a9058c4bcc7689c5827be3f0d47b2cfcfe6e4b7c Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 12:38:57 +0100 Subject: [PATCH 20/22] remove extraneous var --- core/src/databases/database.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index a5d8eed54bd7..cb665b3e8fa2 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -101,12 +101,6 @@ impl Database { utils::now() - time_get_rows_start )); - let table_schemas: HashMap = schema - .iter() - .filter(|(_, table)| !table.schema.is_empty()) - .map(|(table_name, table)| (table_name.clone(), table.schema.clone())) - .collect(); - let generate_create_table_sql_start = utils::now(); let create_tables_sql: String = schema .iter() @@ -137,16 +131,16 @@ impl Database { .iter() .filter(|(_, rows)| !rows.is_empty()) .map(|(table_name, rows)| { - let table_schema = match table_schemas.get(table_name) { + let table_schema = match schema.get(table_name) { Some(s) => Ok(s), None => Err(anyhow!("No schema found for table {}", table_name)), }?; - let (sql, field_names) = table_schema.get_insert_sql(table_name); + let (sql, field_names) = table_schema.schema.get_insert_sql(table_name); let mut stmt = conn.prepare(&sql)?; rows.par_iter() - .map(|r| table_schema.get_insert_params(&field_names, r)) + .map(|r| table_schema.schema.get_insert_params(&field_names, r)) .collect::>>()? .iter() .map(|values| match stmt.execute(params_from_iter(values)) { @@ -411,4 +405,7 @@ impl DatabaseSchema { pub fn iter(&self) -> std::collections::hash_map::Iter { self.0.iter() } + pub fn get(&self, table_name: &str) -> Option<&DatabaseSchemaTable> { + self.0.get(table_name) + } } From a8ef53a191322032c4f2f252d89e08b74e8ff6ad Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 12:40:33 +0100 Subject: [PATCH 21/22] remove return --- core/src/databases/database.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index cb665b3e8fa2..c25bbc63a39c 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -194,13 +194,11 @@ impl Database { Ok(( column_name.clone(), match row.get(i) { - Err(e) => { - return Err(anyhow!( - "Failed to retrieve value for column {}: {}", - column_name, - e - )) - } + Err(e) => Err(anyhow!( + "Failed to retrieve value for column {}: {}", + column_name, + e + )), Ok(v) => match v { rusqlite::types::Value::Integer(i) => { Ok(serde_json::Value::Number(i.into())) From 93bf9f4a0c0b9a7ca7e6caff4545c34ec3c97e67 Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 13:18:22 +0100 Subject: [PATCH 22/22] call params_from_iter in par_iter --- core/src/databases/database.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index c25bbc63a39c..3e182bcd3c81 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -140,10 +140,19 @@ impl Database { let mut stmt = conn.prepare(&sql)?; rows.par_iter() - .map(|r| table_schema.schema.get_insert_params(&field_names, r)) + .map( + |r| match table_schema.schema.get_insert_params(&field_names, r) { + Ok(params) => Ok(params_from_iter(params)), + Err(e) => Err(anyhow!( + "Error getting insert params for row {}: {}", + r.row_id().unwrap_or_else(|| String::from("")), + e + )), + }, + ) .collect::>>()? - .iter() - .map(|values| match stmt.execute(params_from_iter(values)) { + .into_iter() + .map(|params| match stmt.execute(params) { Ok(_) => Ok(()), Err(e) => Err(anyhow!("Error inserting row: {}", e)), })