diff --git a/merlin/dag/executors.py b/merlin/dag/executors.py index 15ce96d9..2cd667d4 100644 --- a/merlin/dag/executors.py +++ b/merlin/dag/executors.py @@ -388,20 +388,25 @@ def transform( if col_dtype: output_dtypes[col_name] = md.dtype(col_dtype).to_numpy - def empty_like(df): + def empty_like(df, cols): # Construct an empty DataFrame with the same dtypes as df return df._constructor( - {k: df._constructor_sliced([], dtype=df[k].dtype) for k in df.columns} + { + col: df._constructor_sliced( + [], dtype=df[col].dtype if col in df.columns else "float64" + ) + for col in cols + } ) if isinstance(output_dtypes, dict) and isinstance(ddf._meta, pd.DataFrame): dtypes = output_dtypes - output_dtypes = empty_like(ddf._meta[columns]) + output_dtypes = empty_like(ddf._meta, columns) for col_name, col_dtype in dtypes.items(): output_dtypes[col_name] = output_dtypes[col_name].astype(col_dtype) elif not output_dtypes: - output_dtypes = empty_like(ddf._meta[columns]) + output_dtypes = empty_like(ddf._meta, columns) return ensure_optimize_dataframe_graph( ddf=ddf.map_partitions(