Skip to content

Commit

Permalink
Skeleton full function processing
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Jan 29, 2025
1 parent e531864 commit 84cb7cd
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
25 changes: 4 additions & 21 deletions src/execution/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use color_eyre::eyre::eyre;
use datafusion::logical_expr::LogicalPlan;
use futures::TryFutureExt;
use log::{debug, error, info};
use wasmtime::{Module, Store};

use crate::config::ExecutionConfig;
use color_eyre::eyre::{self, Result};
Expand All @@ -38,7 +37,7 @@ use tokio_stream::StreamExt;
use super::executor::dedicated::DedicatedExecutor;
use super::local_benchmarks::LocalBenchmarkStats;
use super::stats::{ExecutionDurationStats, ExecutionStats};
use super::wasm::udf_from_wasm_module;
use super::wasm::create_wasm_udfs;
use super::AppType;

/// Structure for executing queries locally
Expand Down Expand Up @@ -101,26 +100,10 @@ impl ExecutionContext {
// Register Parquet Metadata Function
let session_ctx = session_ctx.enable_url_table();

for (module_path, funcs) in &config.wasm_udf.module_functions {
let store = Store::<()>::default();
let module_bytes = std::fs::read(module_path)?;
let module = Module::from_binary(store.engine(), &module_bytes).unwrap();
for func_details in funcs {
let (input_types, output_types) = udf_signature_from_config(func_details);
let wasm_udf = udf_from_wasm_module(&module, &func_details.name);
session_ctx.register_udf(wasm_udf);
}
let wasm_udfs = create_wasm_udfs(&config.wasm_udf)?;
for wasm_udf in wasm_udfs {
session_ctx.register_udf(wasm_udf);
}
// for wasm_dir in &config.wasm_udf.function_directories {
// if wasm_dir.is_dir() {
// for entry in wasm_dir.read_dir()? {
// // let udf = wasm_file_to_udf(entry?);
// // session_ctx.register_udf()
// }
// } else {
// error!("WASM directory {wasm_dir:?} is not a directory")
// }
// }

session_ctx.register_udtf(
"parquet_metadata",
Expand Down
72 changes: 64 additions & 8 deletions src/execution/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
// specific language governing permissions and limitations
// under the License.

use color_eyre::Result;
use datafusion::{arrow::datatypes::DataType, logical_expr::ScalarUDF, prelude::create_udf};
use wasmtime::Module;
use std::sync::Arc;

use crate::config::WasmFuncDetails;
use color_eyre::{eyre::eyre, Result};
use datafusion::{
arrow::datatypes::DataType,
logical_expr::{ColumnarValue, ScalarUDF, Volatility},
prelude::create_udf,
};
use datafusion_common::{DataFusionError, Result as DFResult, ScalarValue};
use log::error;
use wasmtime::{Instance, Module, Store};

pub fn udf_signature_from_config(
func_details: WasmFuncDetails,
use crate::config::{WasmFuncDetails, WasmUdfConfig};

pub fn udf_signature_from_func_details(
func_details: &WasmFuncDetails,
) -> Result<(Vec<DataType>, DataType)> {
let input_types: Result<Vec<DataType>> = func_details
.input_types
Expand All @@ -37,6 +45,54 @@ pub fn udf_signature_from_config(
Ok((input_types?, return_type))
}

pub fn udf_from_wasm_module(module: &Module, func_name: &str) -> ScalarUDF {
create_udf()
fn create_wasm_udf_impl(
module_bytes: Vec<u8>,
) -> impl Fn(&[ColumnarValue]) -> DFResult<ColumnarValue> {
move |args: &[ColumnarValue]| {
// Load the function again
let mut store = Store::<()>::default();

let module = Module::from_binary(store.engine(), &module_bytes)
.map_err(|e| DataFusionError::Internal(format!("Error loading module: {e:?}")))?;

let instance = Instance::new(&mut store, &module, &[])
.map_err(|e| DataFusionError::Internal(format!("Error instantiating module: {e:?}")))?;
Ok(ColumnarValue::Scalar(ScalarValue::Null))
}
}

pub fn create_wasm_udfs(wasm_udf_config: &WasmUdfConfig) -> Result<Vec<ScalarUDF>> {
let mut created_udfs: Vec<ScalarUDF> = Vec::new();
for (module_path, funcs) in &wasm_udf_config.module_functions {
let mut store = Store::<()>::default();
let module_bytes = std::fs::read(module_path)?;
let module = Module::from_binary(store.engine(), &module_bytes).unwrap();
for func_details in funcs {
match udf_signature_from_func_details(func_details) {
Ok((input_types, return_type)) => {
let instance =
Instance::new(&mut store, &module, &[]).map_err(|e| eyre!("{e}"))?;
// Check if the function exists in the WASM module before proceeding with the
// UDF creation
if instance.get_func(&mut store, &func_details.name).is_none() {
error!("WASM function {} is missing in module", &func_details.name);
} else {
let udf_impl = create_wasm_udf_impl(module_bytes.to_owned());
let udf = create_udf(
&func_details.name,
input_types,
return_type,
Volatility::Immutable,
Arc::new(udf_impl),
);
created_udfs.push(udf)
}
}
Err(_) => {
error!("Error parsing WASM UDF signature for {}", func_details.name);
}
}
}
}
Ok(created_udfs)
}

0 comments on commit 84cb7cd

Please sign in to comment.