Skip to content

Commit

Permalink
feat(extract-mdl): implement new function to extract used models
Browse files Browse the repository at this point in the history
  • Loading branch information
grieve54706 committed Nov 29, 2024
1 parent 8d63cbd commit 9065118
Show file tree
Hide file tree
Showing 6 changed files with 506 additions and 3 deletions.
136 changes: 136 additions & 0 deletions wren-core-py/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::errors::CoreError;
use crate::manifest::PyManifest;
use crate::remote_functions::PyRemoteFunction;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -195,6 +196,39 @@ impl PySessionContext {
});
Ok(builder.values().cloned().collect())
}

/// parse the given SQL and return the list of used table name.
pub fn resolve_used_table_names(&self, sql: &str) -> Result<Vec<String>, CoreError> {
let mdl = self.mdl.wren_mdl();
self.ctx
.state()
.sql_to_statement(sql, "generic")
.map_err(CoreError::from)
.and_then(|stmt| {
self.ctx
.state()
.resolve_table_references(&stmt)
.map_err(CoreError::from)
})
.map(|tables| {
tables
.iter()
.filter(|t| {
t.catalog().unwrap_or_default() == mdl.catalog()
&& t.schema().unwrap_or_default() == mdl.schema()
})
.map(|t| t.table().to_string())
.collect()
})
}

/// Given a used dataset list, extract manifest by removing unused datasets.
/// If a model is related to another dataset, both datasets will be kept.
/// The relationship between of them will be kept as well.
/// A dataset could be model, view.
pub fn extract_manifest(&self, used_datasets: Vec<String>) -> PyResult<PyManifest> {
Ok(extractor::extract_manifest(self, &used_datasets))
}
}

impl PySessionContext {
Expand Down Expand Up @@ -244,3 +278,105 @@ impl PySessionContext {
}
}
}

mod extractor {
use crate::context::PySessionContext;
use crate::manifest::PyManifest;
use std::collections::HashSet;
use std::sync::Arc;
use wren_core::mdl::manifest::{Model, Relationship, View};
use wren_core::mdl::WrenMDL;

pub fn extract_manifest(
ctx: &PySessionContext,
used_datasets: &[String],
) -> PyManifest {
let mdl = Arc::clone(&ctx.mdl).wren_mdl();
let used_models = extract_models(&mdl, used_datasets);
let (used_views, models_of_views) = extract_views(&ctx, &mdl, used_datasets);
let used_relationships = extract_relationships(&mdl, used_datasets);
PyManifest {
catalog: mdl.catalog().to_string(),
schema: mdl.schema().to_string(),
models: [used_models, models_of_views].concat(),
relationships: used_relationships,
metrics: mdl.metrics().to_vec(),
views: used_views,
}
}

fn extract_models(mdl: &Arc<WrenMDL>, used_datasets: &[String]) -> Vec<Arc<Model>> {
let mut used_set: HashSet<String> = used_datasets.iter().cloned().collect();
let mut stack: Vec<String> = used_datasets.to_vec();
while let Some(dataset_name) = stack.pop() {
if let Some(model) = mdl.get_model(&dataset_name) {
model
.columns
.iter()
.filter_map(|col| {
col.relationship
.as_ref()
.and_then(|rel_name| mdl.get_relationship(rel_name))
})
.flat_map(|rel| rel.models.clone())
.filter(|related| used_set.insert(related.clone()))
.for_each(|related| stack.push(related));
}
}
mdl.models()
.iter()
.filter(|model| used_set.contains(model.name()))
.cloned()
.collect()
}

fn extract_views(
ctx: &PySessionContext,
mdl: &Arc<WrenMDL>,
used_datasets: &[String],
) -> (Vec<Arc<View>>, Vec<Arc<Model>>) {
let used_set: HashSet<&str> = used_datasets.iter().map(String::as_str).collect();
let stack: Vec<&str> = used_datasets.iter().map(String::as_str).collect();
let models = stack
.iter()
.filter_map(|&dataset_name| {
mdl.get_view(dataset_name).and_then(|view| {
ctx.resolve_used_table_names(&view.statement)
.ok()
.map(|used_tables| extract_models(mdl, &used_tables))
})
})
.flatten()
.collect::<Vec<_>>();
let views = mdl
.views()
.iter()
.filter(|view| used_set.contains(view.name()))
.cloned()
.collect();

(views, models)
}

fn extract_relationships(
mdl: &Arc<WrenMDL>,
used_datasets: &[String],
) -> Vec<Arc<Relationship>> {
let mut used_set: HashSet<String> = used_datasets.iter().cloned().collect();
let mut stack: Vec<String> = used_datasets.to_vec();
while let Some(dataset_name) = stack.pop() {
if let Some(relationship) = mdl.get_relationship(&dataset_name) {
for model in &relationship.models {
if used_set.insert(model.clone()) {
stack.push(model.clone());
}
}
}
}
mdl.relationships()
.iter()
.filter(|rel| rel.models.iter().any(|model| used_set.contains(model)))
.cloned()
.collect()
}
}
2 changes: 2 additions & 0 deletions wren-core-py/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use remote_functions::PyRemoteFunction;

pub mod context;
mod errors;
mod manifest;
pub mod remote_functions;

#[pymodule]
Expand All @@ -12,5 +13,6 @@ fn wren_core_wrapper(m: &Bound<'_, PyModule>) -> PyResult<()> {
env_logger::init();
m.add_class::<context::PySessionContext>()?;
m.add_class::<PyRemoteFunction>()?;
m.add_class::<manifest::PyManifest>()?;
Ok(())
}
Loading

0 comments on commit 9065118

Please sign in to comment.