Skip to content

Commit

Permalink
feat: implement DB get schema (w/ endpoint) (#2489)
Browse files Browse the repository at this point in the history
* feat: implement DB get schema (w/ endpoint)

* pass &Project

* database row list remove support for cross table

* Refactor PostgresStore query for retrieving rows
from a table.

* rusty iter

* remove for loop

* Fix typo in database table name and remove unused
variable.
  • Loading branch information
fontanierh authored Nov 13, 2023
1 parent 6feb62b commit 8d0af84
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 87 deletions.
54 changes: 54 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ qdrant-client = "1.6"
tower-http = {version = "0.4", features = ["full"]}
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
deno_core = "0.200"
deno_core = "0.200"
rayon = "1.8.0"
61 changes: 56 additions & 5 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1963,11 +1963,15 @@ async fn databases_rows_retrieve(
struct DatabasesRowsListQuery {
offset: usize,
limit: usize,
table_id: Option<String>,
}

async fn databases_rows_list(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Path((project_id, data_source_id, database_id, table_id)): extract::Path<(
i64,
String,
String,
String,
)>,
extract::Query(query): extract::Query<DatabasesRowsListQuery>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
Expand All @@ -1979,7 +1983,7 @@ async fn databases_rows_list(
&project,
&data_source_id,
&database_id,
&query.table_id,
&table_id,
Some((query.limit, query.offset)),
)
.await
Expand All @@ -2005,6 +2009,49 @@ async fn databases_rows_list(
}
}

async fn databases_schema_retrieve(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.load_database(&project, &data_source_id, &database_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database",
Some(e),
),
Ok(None) => error_response(
StatusCode::NOT_FOUND,
"database_not_found",
&format!("No database found for id `{}`", database_id),
None,
),
Ok(Some(db)) => match db.get_schema(&project, state.store.clone()).await {
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database schema",
Some(e),
),
Ok(schema) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"schema": schema
})),
}),
),
},
}
}

// Misc

#[derive(serde::Deserialize)]
Expand Down Expand Up @@ -2211,17 +2258,21 @@ fn main() {
get(databases_tables_list),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id",
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id/rows",
post(databases_rows_upsert),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id/rows/:row_id",
get(databases_rows_retrieve),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/rows",
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id/rows",
get(databases_rows_list),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/schema",
get(databases_schema_retrieve),
)
// Misc
.route("/tokenize", post(tokenize))

Expand Down
146 changes: 143 additions & 3 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,36 @@
use serde::Serialize;
use anyhow::{anyhow, Result};

use crate::{project::Project, stores::store::Store};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use super::table_schema::TableSchema;

#[derive(Debug, Clone, Copy, Serialize, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DatabaseType {
LOCAL,
REMOTE,
}

impl ToString for DatabaseType {
fn to_string(&self) -> String {
match self {
DatabaseType::LOCAL => String::from("local"),
DatabaseType::REMOTE => String::from("remote"),
}
}
}

#[derive(Debug, Serialize)]
pub struct Database {
created: u64,
data_source_id: String,
database_id: String,
name: String,
db_type: DatabaseType,
}

impl Database {
Expand All @@ -16,9 +40,106 @@ impl Database {
data_source_id: data_source_id.to_string(),
database_id: database_id.to_string(),
name: name.to_string(),
db_type: DatabaseType::LOCAL,
}
}

pub async fn get_schema(
&self,
project: &Project,
store: Box<dyn Store + Sync + Send>,
) -> Result<DatabaseSchema> {
match self.db_type {
DatabaseType::REMOTE => Err(anyhow!("Remote DB not implemented.")),
DatabaseType::LOCAL => {
let (tables, _) = store
.list_databases_tables(&project, &self.data_source_id, &self.database_id, None)
.await?;

let table_rows_futures = tables
.iter()
.map(|table| {
let store_ref = &store;
let project_ref = &project;
let data_source_id = &self.data_source_id;
let database_id = &self.database_id;
let table_id = table.table_id().to_string();

let rows_future = async move {
store_ref
.list_database_rows(
project_ref,
data_source_id,
database_id,
&table_id,
None,
)
.await
};

rows_future
})
.collect::<Vec<_>>();

// Now, we concurrently wait for all futures to complete.
let results: Vec<_> = futures::future::join_all(table_rows_futures).await;

let mut table_rows = HashMap::new();

for result in results {
match result {
Ok((rows, _)) => {
let first_row = rows.first();
if let Some(row) = first_row {
table_rows.insert(row.table_id().to_string(), rows);
}
}
Err(e) => return Err(e.into()),
}
}

let table_by_id = tables
.iter()
.map(|table| (table.table_id().to_string(), table))
.collect::<HashMap<String, &DatabaseTable>>();

let table_ids_with_rows = table_rows.keys().collect::<Vec<&String>>();

let mut schema = table_ids_with_rows
.par_iter()
.map(|table_id| {
let table = *table_by_id.get(*table_id).unwrap();
let rows = table_rows.get(*table_id).unwrap();

let row_contents =
rows.iter().map(|x| x.content()).collect::<Vec<&Value>>();
let table_schema = TableSchema::from_rows(&row_contents)?;
let database_schema_table =
DatabaseSchemaTable::new(table.clone(), table_schema);
let table_id_str = database_schema_table.table().table_id().to_string();

// Return a tuple of (key, value) that will be directly collected into a HashMap
Ok((table_id_str, database_schema_table))
})
.collect::<Result<HashMap<String, DatabaseSchemaTable>>>()?;

// add empty tables
for table in tables {
let table_id = table.table_id();
if !schema.contains_key(table_id) {
schema.insert(
table_id.to_string(),
DatabaseSchemaTable::new(table.clone(), TableSchema::empty()),
);
}
}

Ok(DatabaseSchema(schema))
}
}
}

// Getters
pub fn created(&self) -> u64 {
self.created
}
Expand All @@ -33,7 +154,7 @@ impl Database {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct DatabaseTable {
created: u64,
database_id: String,
Expand Down Expand Up @@ -75,7 +196,7 @@ impl DatabaseTable {
&self.description
}
}
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct DatabaseRow {
created: u64,
table_id: String,
Expand Down Expand Up @@ -106,3 +227,22 @@ impl DatabaseRow {
&self.content
}
}

#[derive(Debug, Serialize)]
struct DatabaseSchemaTable {
table: DatabaseTable,
schema: TableSchema,
}

impl DatabaseSchemaTable {
pub fn new(table: DatabaseTable, schema: TableSchema) -> Self {
DatabaseSchemaTable { table, schema }
}

pub fn table(&self) -> &DatabaseTable {
&self.table
}
}

#[derive(Debug, Serialize)]
pub struct DatabaseSchema(HashMap<String, DatabaseSchemaTable>);
Loading

0 comments on commit 8d0af84

Please sign in to comment.