Skip to content

Commit

Permalink
structured data: app platform DB block (#2614)
Browse files Browse the repository at this point in the history
* Add database block

* DB schema no array + pass DB info as config to DB block

---------

Co-authored-by: Henry Fontanier <[email protected]>
  • Loading branch information
fontanierh and Henry Fontanier authored Nov 22, 2023
1 parent 8adcd73 commit 7a0ee75
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 33 deletions.
7 changes: 5 additions & 2 deletions core/src/blocks/block.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::blocks::{
browser::Browser, chat::Chat, code::Code, curl::Curl, data::Data, data_source::DataSource,
database_schema::DatabaseSchema, end::End, input::Input, llm::LLM, map::Map, r#while::While,
reduce::Reduce, search::Search,
database::Database, database_schema::DatabaseSchema, end::End, input::Input, llm::LLM,
map::Map, r#while::While, reduce::Reduce, search::Search,
};
use crate::data_sources::qdrant::QdrantClients;
use crate::project::Project;
Expand Down Expand Up @@ -75,6 +75,7 @@ pub enum BlockType {
While,
End,
DatabaseSchema,
Database,
}

impl ToString for BlockType {
Expand All @@ -94,6 +95,7 @@ impl ToString for BlockType {
BlockType::While => String::from("while"),
BlockType::End => String::from("end"),
BlockType::DatabaseSchema => String::from("database_schema"),
BlockType::Database => String::from("database"),
}
}
}
Expand Down Expand Up @@ -196,6 +198,7 @@ pub fn parse_block(t: BlockType, block_pair: Pair<Rule>) -> Result<Box<dyn Block
BlockType::While => Ok(Box::new(While::parse(block_pair)?)),
BlockType::End => Ok(Box::new(End::parse(block_pair)?)),
BlockType::DatabaseSchema => Ok(Box::new(DatabaseSchema::parse(block_pair)?)),
BlockType::Database => Ok(Box::new(Database::parse(block_pair)?)),
}
}

Expand Down
141 changes: 141 additions & 0 deletions core/src/blocks/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use crate::blocks::block::{parse_pair, Block, BlockResult, BlockType, Env};
use crate::Rule;
use anyhow::{anyhow, Result};
use async_trait::async_trait;

use pest::iterators::Pair;
use serde_json::{json, Value};
use tokio::sync::mpsc::UnboundedSender;

use super::block::replace_variables_in_string;
use super::helpers::get_data_source_project;

#[derive(Clone)]
pub struct Database {
query: String,
}

impl Database {
pub fn parse(block_pair: Pair<Rule>) -> Result<Self> {
let mut query: Option<String> = None;

for pair in block_pair.into_inner() {
match pair.as_rule() {
Rule::pair => {
let (key, value) = parse_pair(pair)?;
match key.as_str() {
"query" => query = Some(value),
_ => Err(anyhow!("Unexpected `{}` in `database` block", key))?,
}
}
Rule::expected => Err(anyhow!(
"`expected` is not yet supported in `database` block"
))?,
_ => unreachable!(),
}
}

if !query.is_some() {
Err(anyhow!("Missing required `query` in `database` block"))?;
}

Ok(Database {
query: query.unwrap(),
})
}
}

#[async_trait]
impl Block for Database {
fn block_type(&self) -> BlockType {
BlockType::Database
}

fn inner_hash(&self) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update("database_schema".as_bytes());
hasher.update(self.query.as_bytes());
format!("{}", hasher.finalize().to_hex())
}

async fn execute(
&self,
name: &str,
env: &Env,
_event_sender: Option<UnboundedSender<Value>>,
) -> Result<BlockResult> {
let config = env.config.config_for_block(name);

let err_msg = format!(
"Invalid or missing `database` in configuration for \
`database` block `{}` expecting `{{ \"database\": \
{{ \"workspace_id\": ..., \"data_source_id\": ..., \"database_id\": ... }} }}`",
name
);

let (workspace_id, data_source_id, database_id) = match config {
Some(v) => match v.get("database") {
Some(Value::Object(o)) => {
let workspace_id = match o.get("workspace_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let data_source_id = match o.get("data_source_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let database_id = match o.get("database_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};

Ok((workspace_id, data_source_id, database_id))
}
_ => Err(anyhow!(err_msg)),
},
None => Err(anyhow!(err_msg)),
}?;

let query = replace_variables_in_string(&self.query, "query", env)?;
let project = get_data_source_project(&workspace_id, &data_source_id, env).await?;

let database = match env
.store
.load_database(&project, &data_source_id, &database_id)
.await?
{
Some(d) => d,
None => Err(anyhow!(
"Database `{}` not found in data source `{}`",
database_id,
data_source_id
))?,
};

let (rows, schema) = match database.query(&project, env.store.clone(), &query).await {
Ok(r) => r,
Err(e) => Err(anyhow!(
"Error querying database `{}` in data source `{}`: {}",
database_id,
data_source_id,
e
))?,
};

Ok(BlockResult {
value: json!({
"rows": rows,
"schema": schema,
}),
meta: None,
})
}

fn clone_box(&self) -> Box<dyn Block + Sync + Send> {
Box::new(self.clone())
}

fn as_any(&self) -> &dyn std::any::Any {
self
}
}
53 changes: 22 additions & 31 deletions core/src/blocks/database_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::blocks::block::{Block, BlockResult, BlockType, Env};
use crate::Rule;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use futures::future::try_join_all;
use pest::iterators::Pair;
use serde_json::Value;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -39,47 +38,39 @@ impl Block for DatabaseSchema {
let config = env.config.config_for_block(name);

let err_msg = format!(
"Invalid or missing `databases` in configuration for \
`database_schema` block `{}` expecting `{{ \"databases\": \
[ {{ \"workspace_id\": ..., \"data_source_id\": ..., \"database_id\": ... }}, ... ] }}`",
"Invalid or missing `database` in configuration for \
`database_schema` block `{}` expecting `{{ \"database\": \
{{ \"workspace_id\": ..., \"data_source_id\": ..., \"database_id\": ... }} }}`",
name
);

let databases = match config {
Some(v) => match v.get("databases") {
Some(Value::Array(a)) => a
.iter()
.map(|v| {
let workspace_id = match v.get("workspace_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let data_source_id = match v.get("data_source_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let database_id = match v.get("database_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let (workspace_id, data_source_id, database_id) = match config {
Some(v) => match v.get("database") {
Some(Value::Object(o)) => {
let workspace_id = match o.get("workspace_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let data_source_id = match o.get("data_source_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};
let database_id = match o.get("database_id") {
Some(Value::String(s)) => s,
_ => Err(anyhow!(err_msg.clone()))?,
};

Ok((workspace_id, data_source_id, database_id))
})
.collect::<Result<Vec<_>>>(),
Ok((workspace_id, data_source_id, database_id))
}
_ => Err(anyhow!(err_msg)),
},
None => Err(anyhow!(err_msg)),
}?;

let schemas = try_join_all(databases.iter().map(
|(workspace_id, data_source_id, database_id)| {
get_database_schema(workspace_id, data_source_id, database_id, env)
},
))
.await?;
let schema = get_database_schema(workspace_id, data_source_id, database_id, env).await?;

Ok(BlockResult {
value: serde_json::to_value(schemas)?,
value: serde_json::to_value(schema)?,
meta: None,
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod blocks {
pub mod curl;
pub mod data;
pub mod data_source;
pub mod database;
pub mod database_schema;
pub mod end;
pub mod helpers;
Expand Down
1 change: 1 addition & 0 deletions core/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl RunConfig {
BlockType::While => 64,
BlockType::End => 64,
BlockType::DatabaseSchema => 8,
BlockType::Database => 8,
}
}
}
Expand Down

0 comments on commit 7a0ee75

Please sign in to comment.