Skip to content

Commit

Permalink
Start using the validate_schemas hook from core (#95)
Browse files Browse the repository at this point in the history
This just clean up `compute_input_schema` a bit.
  • Loading branch information
karlhigley authored Oct 7, 2022
1 parent 29f93f0 commit 3fae964
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
14 changes: 9 additions & 5 deletions merlin/systems/dag/ops/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ def compute_input_schema(
input_schema = super().compute_input_schema(
root_schema, parents_schema, deps_schema, selector
)
if len(input_schema.column_schemas) > 1:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)
return input_schema

def compute_output_schema(
Expand Down Expand Up @@ -237,6 +232,15 @@ def compute_output_schema(
]
)

def validate_schemas(
self, parents_schema, deps_schema, input_schema, output_schema, strict_dtypes=False
):
if len(input_schema.column_schemas) > 1:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)


def setup_faiss(item_vector, output_path: str):
"""
Expand Down
39 changes: 21 additions & 18 deletions merlin/systems/dag/ops/session_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,6 @@ def compute_input_schema(
root_schema, parents_schema, deps_schema, selector
)

if len(parents_schema.column_schemas) > 1:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)
if len(deps_schema.column_schemas) > 1:
raise ValueError(
"More than one dependency input has been detected"
/ f"for this node, inputs received: {input_schema.column_names}"
)

# 1 for deps and 1 for parents
if len(input_schema.column_schemas) > 2:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)

self._input_col = parents_schema.column_names[0]
self._filter_out_col = deps_schema.column_names[0]

Expand Down Expand Up @@ -157,6 +139,27 @@ def compute_output_schema(
"""
return Schema([ColumnSchema("filtered_ids", dtype=np.int32, is_list=False)])

def validate_schemas(
self, parents_schema, deps_schema, input_schema, output_schema, strict_dtypes=False
):
if len(parents_schema.column_schemas) > 1:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)
if len(deps_schema.column_schemas) > 1:
raise ValueError(
"More than one dependency input has been detected"
/ f"for this node, inputs received: {input_schema.column_names}"
)

# 1 for deps and 1 for parents
if len(input_schema.column_schemas) > 2:
raise ValueError(
"More than one input has been detected for this node,"
/ f"inputs received: {input_schema.column_names}"
)

def transform(self, df: InferenceDataFrame):
"""
Transform input dataframe to output dataframe using function logic.
Expand Down

0 comments on commit 3fae964

Please sign in to comment.