Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFI initial implementation #12920

Merged
merged 28 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4bb3d58
Initial commit of FFI table provider code
timsaucer Oct 10, 2024
4d31722
Add table type
timsaucer Oct 10, 2024
d5f541e
Make struct pub
timsaucer Oct 10, 2024
afeb439
Implementing supports_filters_pushdown
timsaucer Oct 10, 2024
7fb77da
Move plan properties over to its own file
timsaucer Oct 10, 2024
3ab0f9f
Adding release function
timsaucer Oct 10, 2024
c616227
Adding release functions to additional structs
timsaucer Oct 10, 2024
461b4a9
Resolve memory leaks
timsaucer Oct 10, 2024
0408a9b
Rename ForeignExecutionPlan for consistency
timsaucer Oct 10, 2024
7922d8f
Resolving memory leak issues
timsaucer Oct 14, 2024
afd06be
Remove debug statements. Create runtime for block_on operations
timsaucer Oct 15, 2024
dafc982
Switching over to stable abi and async-ffi
timsaucer Oct 16, 2024
ff4d2e4
Make consistent the use of Foreign and FFI on struct names
timsaucer Oct 16, 2024
1dbca58
Apply prettier
timsaucer Oct 17, 2024
7761c84
Format for linter
timsaucer Oct 17, 2024
d0f8f88
Add doc-comment
timsaucer Oct 17, 2024
6b5227e
Add option to specify table provider does not support pushdown filter…
timsaucer Oct 21, 2024
400f45a
Remove setting default features in cargo file
timsaucer Oct 21, 2024
8b220cd
Tokio only needed for unit tests
timsaucer Oct 21, 2024
8d0f86d
Provide log errors rather than failing silently on schema requests
timsaucer Oct 21, 2024
9c01f75
Set default features for datafusion to false in ffi crate
timsaucer Oct 21, 2024
61f44ae
Using TryFrom or From instead of implementing new when there is only …
timsaucer Oct 26, 2024
1576520
Move arrow wrappers into their own file
timsaucer Oct 26, 2024
790f454
Add documentation
timsaucer Oct 26, 2024
bf626f4
Small adjustment to documentation
timsaucer Oct 27, 2024
bb47819
Add license text
timsaucer Oct 30, 2024
71ae880
Fix unnecessary qualification
timsaucer Oct 30, 2024
011340c
taplo format
timsaucer Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Using TryFrom or From instead of implementing new when there is only …
…one parameter
  • Loading branch information
timsaucer committed Oct 30, 2024
commit 61f44aed2db579ad05fe81fb7e0eed491dcae344
55 changes: 26 additions & 29 deletions datafusion/ffi/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use datafusion::{
};

use crate::{
plan_properties::{FFI_PlanProperties, ForeignPlanProperties},
record_batch_stream::FFI_RecordBatchStream,
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
};

#[repr(C)]
Expand Down Expand Up @@ -73,7 +72,7 @@ unsafe extern "C" fn properties_fn_wrapper(
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;

FFI_PlanProperties::new(plan.properties())
plan.properties().into()
}

unsafe extern "C" fn children_fn_wrapper(
Expand Down Expand Up @@ -101,7 +100,7 @@ unsafe extern "C" fn execute_fn_wrapper(
let ctx = &(*private_data).context;

match plan.execute(partition, Arc::clone(ctx)) {
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs)),
Ok(rbs) => RResult::ROk(rbs.into()),
Err(e) => RResult::RErr(
format!("Error occurred during FFI_ExecutionPlan execute: {}", e).into(),
),
Expand Down Expand Up @@ -183,31 +182,29 @@ impl DisplayAs for ForeignExecutionPlan {
}
}

impl ForeignExecutionPlan {
/// Takes ownership of a FFI_ExecutionPlan
///
/// # Safety
///
/// The caller must ensure the pointer provided points to a valid implementation
/// of FFI_ExecutionPlan
pub unsafe fn new(plan: FFI_ExecutionPlan) -> Result<Self> {
let name = (plan.name)(&plan).into();
impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
type Error = DataFusionError;

let properties = ForeignPlanProperties::new((plan.properties)(&plan))?;

let children_rvec = (plan.children)(&plan);
let children: Result<Vec<_>> = children_rvec
.iter()
.map(|child| ForeignExecutionPlan::new(child.clone()))
.map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
.collect();

Ok(Self {
name,
plan,
properties: properties.0,
children: children?,
})
fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
unsafe {
let name = (plan.name)(plan).into();

let properties: PlanProperties = (plan.properties)(plan).try_into()?;

let children_rvec = (plan.children)(plan);
let children: Result<Vec<_>> = children_rvec
.iter()
.map(ForeignExecutionPlan::try_from)
.map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
.collect();

Ok(Self {
name,
plan: plan.clone(),
properties,
children: children?,
})
}
}
}

Expand Down Expand Up @@ -345,7 +342,7 @@ mod tests {

let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx());

let foreign_plan = unsafe { ForeignExecutionPlan::new(local_plan)? };
let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;

assert!(original_name == foreign_plan.name());

Expand Down
43 changes: 18 additions & 25 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ impl Drop for FFI_PlanProperties {
}
}

impl FFI_PlanProperties {
pub fn new(props: &PlanProperties) -> Self {
impl From<&PlanProperties> for FFI_PlanProperties {
fn from(props: &PlanProperties) -> Self {
let private_data = Box::new(PlanPropertiesPrivateData {
props: props.clone(),
});
Expand All @@ -229,26 +229,19 @@ impl FFI_PlanProperties {
}
}

#[derive(Debug)]
pub struct ForeignPlanProperties(pub PlanProperties);

impl ForeignPlanProperties {
/// Construct a ForeignPlanProperties object from a FFI Plan Properties.
///
/// # Safety
///
/// This function will call the unsafe interfaces on FFI_PlanProperties
/// provided, so the user must ensure it remains valid for the lifetime
/// of the returned struct.
pub unsafe fn new(ffi_props: FFI_PlanProperties) -> Result<Self> {
let ffi_schema = (ffi_props.schema)(&ffi_props);
impl TryFrom<FFI_PlanProperties> for PlanProperties {
type Error = DataFusionError;

fn try_from(ffi_props: FFI_PlanProperties) -> Result<Self, Self::Error> {
let ffi_schema = unsafe { (ffi_props.schema)(&ffi_props) };
let schema = (&ffi_schema.0).try_into()?;

// TODO Extend FFI to get the registry and codex
let default_ctx = SessionContext::new();
let codex = DefaultPhysicalExtensionCodec {};

let orderings = match (ffi_props.output_ordering)(&ffi_props) {
let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
let orderings = match ffi_orderings {
ROk(ordering_vec) => {
let proto_output_ordering =
PhysicalSortExprNodeCollection::decode(ordering_vec.as_ref())
Expand All @@ -263,7 +256,8 @@ impl ForeignPlanProperties {
RErr(e) => return Err(DataFusionError::Plan(e.to_string())),
};

let partitioning = match (ffi_props.output_partitioning)(&ffi_props) {
let ffi_partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) };
let partitioning = match ffi_partitioning {
ROk(partitioning_vec) => {
let proto_output_partitioning =
Partitioning::decode(partitioning_vec.as_ref())
Expand All @@ -282,7 +276,8 @@ impl ForeignPlanProperties {
RErr(e) => Err(DataFusionError::Plan(e.to_string())),
}?;

let execution_mode: ExecutionMode = (ffi_props.execution_mode)(&ffi_props).into();
let execution_mode: ExecutionMode =
unsafe { (ffi_props.execution_mode)(&ffi_props).into() };

let eq_properties = match orderings {
Some(ordering) => {
Expand All @@ -291,11 +286,11 @@ impl ForeignPlanProperties {
None => EquivalenceProperties::new(Arc::new(schema)),
};

Ok(Self(PlanProperties::new(
Ok(PlanProperties::new(
eq_properties,
partitioning,
execution_mode,
)))
))
}
}

Expand All @@ -317,13 +312,11 @@ mod tests {
ExecutionMode::Unbounded,
);

let local_props_ptr = FFI_PlanProperties::new(&original_props);

let foreign_props = unsafe { ForeignPlanProperties::new(local_props_ptr)? };
let local_props_ptr = FFI_PlanProperties::from(&original_props);

let returned_props: PlanProperties = foreign_props.0;
let foreign_props: PlanProperties = local_props_ptr.try_into()?;

assert!(format!("{:?}", returned_props) == format!("{:?}", original_props));
assert!(format!("{:?}", foreign_props) == format!("{:?}", original_props));

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/ffi/src/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ pub struct FFI_RecordBatchStream {
pub private_data: *mut c_void,
}

impl FFI_RecordBatchStream {
pub fn new(stream: SendableRecordBatchStream) -> Self {
impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
fn from(stream: SendableRecordBatchStream) -> Self {
FFI_RecordBatchStream {
poll_next: poll_next_fn_wrapper,
schema: schema_fn_wrapper,
Expand Down
27 changes: 10 additions & 17 deletions datafusion/ffi/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use abi_stable::{
std_types::{RHashMap, RString},
StableAbi,
};
use datafusion::prelude::SessionConfig;
use datafusion::{config::ConfigOptions, error::Result};
use datafusion::{error::DataFusionError, prelude::SessionConfig};

#[repr(C)]
#[derive(Debug, StableAbi)]
Expand Down Expand Up @@ -84,9 +84,8 @@ struct SessionConfigPrivateData {
pub config: ConfigOptions,
}

impl FFI_SessionConfig {
/// Creates a new [`FFI_SessionConfig`].
pub fn new(session: &SessionConfig) -> Self {
impl From<&SessionConfig> for FFI_SessionConfig {
fn from(session: &SessionConfig) -> Self {
let mut config_keys = Vec::new();
let mut config_values = Vec::new();
for config_entry in session.options().entries() {
Expand Down Expand Up @@ -128,17 +127,11 @@ impl Drop for FFI_SessionConfig {

pub struct ForeignSessionConfig(pub SessionConfig);

impl ForeignSessionConfig {
/// Create a session config object from a foreign provider.
///
/// # Safety
///
/// This function will dereference the provided config pointer and will
/// access it's unsafe methods. It is the provider's responsibility that
/// this pointer and it's internal functions remain valid for the lifetime
/// of the returned struct.
pub unsafe fn new(config: &FFI_SessionConfig) -> Result<Self> {
let config_options = (config.config_options)(config);
impl TryFrom<&FFI_SessionConfig> for ForeignSessionConfig {
type Error = DataFusionError;

fn try_from(config: &FFI_SessionConfig) -> Result<Self, Self::Error> {
let config_options = unsafe { (config.config_options)(config) };

let mut options_map = HashMap::new();
config_options.iter().for_each(|kv_pair| {
Expand All @@ -158,9 +151,9 @@ mod tests {
let session_config = SessionConfig::new();
let original_options = session_config.options().entries();

let ffi_config = FFI_SessionConfig::new(&session_config);
let ffi_config: FFI_SessionConfig = (&session_config).into();

let foreign_config = unsafe { ForeignSessionConfig::new(&ffi_config)? };
let foreign_config: ForeignSessionConfig = (&ffi_config).try_into()?;

let returned_options = foreign_config.0.options().entries();

Expand Down
12 changes: 6 additions & 6 deletions datafusion/ffi/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ unsafe extern "C" fn scan_fn_wrapper(
let session_config = session_config.clone();

async move {
let config = match ForeignSessionConfig::new(&session_config) {
let config = match ForeignSessionConfig::try_from(&session_config) {
Ok(c) => c,
Err(e) => return RResult::RErr(e.to_string().into()),
};
Expand Down Expand Up @@ -266,8 +266,8 @@ pub struct ForeignTableProvider(FFI_TableProvider);
unsafe impl Send for ForeignTableProvider {}
unsafe impl Sync for ForeignTableProvider {}

impl ForeignTableProvider {
pub fn new(provider: &FFI_TableProvider) -> Self {
impl From<&FFI_TableProvider> for ForeignTableProvider {
fn from(provider: &FFI_TableProvider) -> Self {
Self(provider.clone())
}
}
Expand Down Expand Up @@ -300,7 +300,7 @@ impl TableProvider for ForeignTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let session_config = FFI_SessionConfig::new(session.config());
let session_config: FFI_SessionConfig = session.config().into();

let projections: Option<RVec<usize>> =
projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
Expand All @@ -322,7 +322,7 @@ impl TableProvider for ForeignTableProvider {
.await;

match maybe_plan {
RResult::ROk(p) => ForeignExecutionPlan::new(p)?,
RResult::ROk(p) => ForeignExecutionPlan::try_from(&p)?,
RResult::RErr(_) => {
return Err(datafusion::error::DataFusionError::Internal(
"Unable to perform scan via FFI".to_string(),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {

let ffi_provider = FFI_TableProvider::new(provider, true);

let foreign_table_provider = ForeignTableProvider::new(&ffi_provider);
let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into();

ctx.register_table("t", Arc::new(foreign_table_provider))?;

Expand Down