From cb241b186ebfec7a5f9221965a930f6f523c3e7b Mon Sep 17 00:00:00 2001 From: Henry Fontanier Date: Thu, 16 Nov 2023 12:22:57 +0100 Subject: [PATCH] remove extra variable --- core/bin/dust_api.rs | 4 +- core/src/databases/database.rs | 111 +++++++++++++++++---------------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/core/bin/dust_api.rs b/core/bin/dust_api.rs index 8f4bf5dcbe8e..feb967f3bec6 100644 --- a/core/bin/dust_api.rs +++ b/core/bin/dust_api.rs @@ -2032,14 +2032,14 @@ async fn databases_schema_retrieve( &format!("No database found for id `{}`", database_id), None, ), - Ok(Some(db)) => match db.get_schema(&project, state.store.clone(), false).await { + Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await { Err(e) => error_response( StatusCode::INTERNAL_SERVER_ERROR, "internal_server_error", "Failed to retrieve database schema", Some(e), ), - Ok((schema, _)) => ( + Ok(schema) => ( StatusCode::OK, Json(APIResponse { error: None, diff --git a/core/src/databases/database.rs b/core/src/databases/database.rs index 1e76397ba489..a5d8eed54bd7 100644 --- a/core/src/databases/database.rs +++ b/core/src/databases/database.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; -use super::table_schema::{SqlParam, TableSchema}; +use super::table_schema::TableSchema; #[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)] #[serde(rename_all = "lowercase")] @@ -49,41 +49,11 @@ impl Database { &self, project: &Project, store: Box, - return_rows: bool, - ) -> Result<(DatabaseSchema, Option>>)> { + ) -> Result { match self.db_type { DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")), DatabaseType::LOCAL => { - let (tables, _) = store - .list_databases_tables(&project, &self.data_source_id, &self.database_id, None) - .await?; - - // Concurrently retrieve table rows. - let rows = futures::future::try_join_all( - tables - .into_iter() - .map(|table| { - let store = store.clone(); - - async move { - let (rows, _) = store - .list_database_rows( - project, - self.data_source_id.as_str(), - self.database_id.as_str(), - table.table_id(), - None, - ) - .await?; - - Ok::<_, anyhow::Error>((table, rows)) - } - }) - .collect::>(), - ) - .await? - .into_iter() - .collect::>(); + let rows = self.get_rows(project, store).await?; let schema = rows .par_iter() @@ -95,16 +65,7 @@ impl Database { }) .collect::>>()?; - let returned_rows = match return_rows { - true => Some( - rows.into_iter() - .map(|(table, rows)| (table.table_id().to_string(), rows)) - .collect::>(), - ), - false => None, - }; - - Ok((DatabaseSchema(schema), returned_rows)) + Ok(DatabaseSchema(schema)) } } } @@ -120,16 +81,26 @@ impl Database { )), DatabaseType::LOCAL => { let time_build_db_start = utils::now(); - let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?; - let rows_by_table = match rows_by_table { - Some(rows) => rows, - None => return Err(anyhow!("No rows found")), - }; + + let schema = self.get_schema(project, store.clone()).await?; utils::done(&format!( "DSSTRUCTSTAT Finished retrieving schema: duration={}ms", utils::now() - time_build_db_start )); + let time_get_rows_start = utils::now(); + let rows_by_table = match self.get_rows(project, store.clone()).await { + Ok(rows) => Ok(rows + .into_iter() + .map(|(table, rows)| (table.table_id().to_string(), rows)) + .collect::>()), + _ => Err(anyhow!("Error retrieving rows from database.")), + }?; + utils::done(&format!( + "DSSTRUCTSTAT Finished retrieving rows: duration={}ms", + utils::now() - time_get_rows_start + )); + let table_schemas: HashMap = schema .iter() .filter(|(_, table)| !table.schema.is_empty()) @@ -174,12 +145,9 @@ impl Database { let (sql, field_names) = table_schema.get_insert_sql(table_name); let mut stmt = conn.prepare(&sql)?; - let params: Vec> = rows - .par_iter() + rows.par_iter() .map(|r| table_schema.get_insert_params(&field_names, r)) - .collect::>>()?; - - params + .collect::>>()? .iter() .map(|values| match stmt.execute(params_from_iter(values)) { Ok(_) => Ok(()), @@ -299,6 +267,43 @@ impl Database { } } + pub async fn get_rows( + &self, + project: &Project, + store: Box, + ) -> Result)>> { + let (tables, _) = store + .list_databases_tables(&project, &self.data_source_id, &self.database_id, None) + .await?; + + // Concurrently retrieve table rows. + Ok(futures::future::try_join_all( + tables + .into_iter() + .map(|table| { + let store = store.clone(); + + async move { + let (rows, _) = store + .list_database_rows( + project, + self.data_source_id.as_str(), + self.database_id.as_str(), + table.table_id(), + None, + ) + .await?; + + Ok::<_, anyhow::Error>((table, rows)) + } + }) + .collect::>(), + ) + .await? + .into_iter() + .collect::>()) + } + // Getters pub fn created(&self) -> u64 { self.created