Skip to content

Commit

Permalink
remove extra variable
Browse files Browse the repository at this point in the history
  • Loading branch information
fontanierh committed Nov 16, 2023
1 parent 0240957 commit cb241b1
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 55 deletions.
4 changes: 2 additions & 2 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2032,14 +2032,14 @@ async fn databases_schema_retrieve(
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db.get_schema(&project, state.store.clone(), false).await {
Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database schema",
Some(e),
),
Ok((schema, _)) => (
Ok(schema) => (
StatusCode::OK,
Json(APIResponse {
error: None,
Expand Down
111 changes: 58 additions & 53 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use super::table_schema::{SqlParam, TableSchema};
use super::table_schema::TableSchema;

#[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
Expand Down Expand Up @@ -49,41 +49,11 @@ impl Database {
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
return_rows: bool,
) -> Result<(DatabaseSchema, Option<HashMap<String, Vec<DatabaseRow>>>)> {
) -> Result<DatabaseSchema> {
match self.db_type {
DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")),
DatabaseType::LOCAL => {
let (tables, _) = store
.list_databases_tables(&project, &self.data_source_id, &self.database_id, None)
.await?;

// Concurrently retrieve table rows.
let rows = futures::future::try_join_all(
tables
.into_iter()
.map(|table| {
let store = store.clone();

async move {
let (rows, _) = store
.list_database_rows(
project,
self.data_source_id.as_str(),
self.database_id.as_str(),
table.table_id(),
None,
)
.await?;

Ok::<_, anyhow::Error>((table, rows))
}
})
.collect::<Vec<_>>(),
)
.await?
.into_iter()
.collect::<Vec<_>>();
let rows = self.get_rows(project, store).await?;

let schema = rows
.par_iter()
Expand All @@ -95,16 +65,7 @@ impl Database {
})
.collect::<Result<HashMap<_, _>>>()?;

let returned_rows = match return_rows {
true => Some(
rows.into_iter()
.map(|(table, rows)| (table.table_id().to_string(), rows))
.collect::<HashMap<_, _>>(),
),
false => None,
};

Ok((DatabaseSchema(schema), returned_rows))
Ok(DatabaseSchema(schema))
}
}
}
Expand All @@ -120,16 +81,26 @@ impl Database {
)),
DatabaseType::LOCAL => {
let time_build_db_start = utils::now();
let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?;
let rows_by_table = match rows_by_table {
Some(rows) => rows,
None => return Err(anyhow!("No rows found")),
};

let schema = self.get_schema(project, store.clone()).await?;
utils::done(&format!(
"DSSTRUCTSTAT Finished retrieving schema: duration={}ms",
utils::now() - time_build_db_start
));

let time_get_rows_start = utils::now();
let rows_by_table = match self.get_rows(project, store.clone()).await {
Ok(rows) => Ok(rows
.into_iter()
.map(|(table, rows)| (table.table_id().to_string(), rows))
.collect::<HashMap<_, _>>()),
_ => Err(anyhow!("Error retrieving rows from database.")),
}?;
utils::done(&format!(
"DSSTRUCTSTAT Finished retrieving rows: duration={}ms",
utils::now() - time_get_rows_start
));

let table_schemas: HashMap<String, TableSchema> = schema
.iter()
.filter(|(_, table)| !table.schema.is_empty())
Expand Down Expand Up @@ -174,12 +145,9 @@ impl Database {
let (sql, field_names) = table_schema.get_insert_sql(table_name);
let mut stmt = conn.prepare(&sql)?;

let params: Vec<Vec<SqlParam>> = rows
.par_iter()
rows.par_iter()
.map(|r| table_schema.get_insert_params(&field_names, r))
.collect::<Result<Vec<_>>>()?;

params
.collect::<Result<Vec<_>>>()?
.iter()
.map(|values| match stmt.execute(params_from_iter(values)) {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -299,6 +267,43 @@ impl Database {
}
}

pub async fn get_rows(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
) -> Result<Vec<(DatabaseTable, Vec<DatabaseRow>)>> {
let (tables, _) = store
.list_databases_tables(&project, &self.data_source_id, &self.database_id, None)
.await?;

// Concurrently retrieve table rows.
Ok(futures::future::try_join_all(
tables
.into_iter()
.map(|table| {
let store = store.clone();

async move {
let (rows, _) = store
.list_database_rows(
project,
self.data_source_id.as_str(),
self.database_id.as_str(),
table.table_id(),
None,
)
.await?;

Ok::<_, anyhow::Error>((table, rows))
}
})
.collect::<Vec<_>>(),
)
.await?
.into_iter()
.collect::<Vec<_>>())
}

// Getters
pub fn created(&self) -> u64 {
self.created
Expand Down

0 comments on commit cb241b1

Please sign in to comment.