Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OneHotEncoder serialization failed #779

Open
inardini opened this issue Oct 3, 2021 · 3 comments
Open

OneHotEncoder serialization failed #779

inardini opened this issue Oct 3, 2021 · 3 comments

Comments

@inardini
Copy link

inardini commented Oct 3, 2021

To who it may concern,

I'm using mleap-pyspark to serialize the following pipeline using pyspark 3.0.2 and mleap 0.18.1.

imputer --> string_indexer --> imputer --> string_indexer --> one_hot_encoder --> vector_assembler --> scaler --> vector_assembler --> random_classifier

But I get this error:

Py4JJavaError: An error occurred while calling o99817.serializeToBundle. : java.lang.RuntimeException: unsupported attribute for field loan_term_idx_imputed at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$.sizeForField(OneHotEncoderOp.scala:31) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.$anonfun$store$2$adapted(OneHotEncoderOp.scala:47) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:47) at org.apache.spark.ml.bundle.ops.feature.OneHotEncoderOp$$anon$1.store(OneHotEncoderOp.scala:37) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$writeNode$1(GraphSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.GraphSerializer.writeNode(GraphSerializer.scala:30) at ml.combust.bundle.serializer.GraphSerializer.$anonfun$write$2(GraphSerializer.scala:21) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at ml.combust.bundle.serializer.GraphSerializer.write(GraphSerializer.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:21) at org.apache.spark.ml.bundle.ops.PipelineOp$$anon$1.store(PipelineOp.scala:14) at ml.combust.bundle.serializer.ModelSerializer.$anonfun$write$1(ModelSerializer.scala:87) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.ModelSerializer.write(ModelSerializer.scala:83) at ml.combust.bundle.serializer.NodeSerializer.$anonfun$write$1(NodeSerializer.scala:85) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.NodeSerializer.write(NodeSerializer.scala:81) at ml.combust.bundle.serializer.BundleSerializer.$anonfun$write$1(BundleSerializer.scala:34) at scala.util.Try$.apply(Try.scala:213) at ml.combust.bundle.serializer.BundleSerializer.write(BundleSerializer.scala:29) at ml.combust.bundle.BundleWriter.save(BundleWriter.scala:34) at ml.combust.mleap.spark.SimpleSparkSerializer.$anonfun$serializeToBundleWithFormat$4(SimpleSparkSerializer.scala:26) at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) at scala.util.control.Exception$Catch.apply(Exception.scala:228) at scala.util.control.Exception$Catch.either(Exception.scala:252) at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) at resource.DeferredExtractableManagedResource.$anonfun$tried$1(AbstractManagedResource.scala:33) at scala.util.Try$.apply(Try.scala:213) at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundleWithFormat(SimpleSparkSerializer.scala:25) at ml.combust.mleap.spark.SimpleSparkSerializer.serializeToBundle(SimpleSparkSerializer.scala:17) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

Any insights?

@jsleight
Copy link
Contributor

jsleight commented Oct 4, 2021

From looking at the source code, this error is caused by the one hot encoder op not being able to resolve the categorical size of the loan_term_idx_imputed field. It expects the field to be either nominal or binary and I think it is getting an "unresolved" type right now.

Few followup questions:

  • Are you setting InputCol or InputCols for the one hot encoder? Mleap seems like it only supports InputCols atm, which I'm guessing you are using already, but just to confirm.
  • Looks like the one hot encoder has the loan_term_idx_imputed column as part of its InputCols? Is that what you expected? I.e., you didn't want the string indexer output to be the one hot encoder input?
  • Can you let us know what the schema metadata is for this column? I.e., first transform the dataframe with your pipeline, then do df.select(df.loan_term_idx_imputed).schema[0].metadata. Also report back with this metadata for the string indexer output column if you don't mind. This information is what gets used to determine whether the field is nominal/binary/numeric/unresolved.

@inardini
Copy link
Author

inardini commented Oct 5, 2021

Thanks for feedback.

Point by Point:

  • I'm setting InputCols below the code:

one_hot_encoder = OneHotEncoder(dropLast=False, inputCols=categorical_imputer.getOutputCols(), outputCols=ONE_HOT_ENCODED_FEATURES, handleInvalid='keep')

  • Yep, it is expected. But basically are you suggesting to impute the categorical variables. Then convert in strings and apply one hot encoder?

  • About the test, below the transformation journey of loan_term variable:

     1. train.select(train.loan_term).schema[0].metadata --> {}
     2. train.select(train.loan_term_idx).schema[0].metadata --> {'ml_attr': {'name': 'loan_term_idx', 'type': 'nominal', 'vals' ['360', '180', 'nan', '480', '120', '300', '240', '60',   '__unknown']}}
     3. train.select(train.loan_term_idx_imputed).schema[0].metadata --> {}
    

Hope it helps

@jsleight
Copy link
Contributor

jsleight commented Oct 5, 2021

Thanks for the requested info.

So what is happening is that the loan_term_idx field is considered a "nominal" attribute, but loan_term_idx_imputed has reset that attribute status so it is now "unresolved". MLeap's one hot encoder op requires that the transformer have a fixed size of state, which it infers from the vals.size in the metadata for each of inputCols.

Since MLeap is on Spark v3 now though, we could instead look at the new OneHotEncoderModel.categorySizes property instead of inferring things from metadata. We need to change this line

So there are two paths forward for you:

  1. Wait for this fix to go though. If you're up for it, I'm happy to review and merge a PR 😄 , then you'd need to wait for the next release or use a snapshot.
  2. Alter your pipeline. I think you can either put the imputer before the stringindexer or can maybe even remove the imputer all together and let the one encoder's keep invalid handle the values which would otherwise be imputed.

Cheers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants