Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] lightgbm and CrossValidator are not compatible #2323

Open
5 of 19 tasks
stupidoge opened this issue Dec 2, 2024 · 3 comments
Open
5 of 19 tasks

[BUG] lightgbm and CrossValidator are not compatible #2323

stupidoge opened this issue Dec 2, 2024 · 3 comments

Comments

@stupidoge
Copy link

SynapseML version

synapseml_2.12:1.0.8

System information

  • Language version (python 3.8, scala 2.12):
  • Spark Version (3.5.0):
  • Spark Platform (Databricks):

Describe the problem

I tried to combine synpase.ml.lightgbm with CrossValidator and hyperopt on Databricks. There are some trials and issues:
1. tried hyperopt and lightgbm, failed after 43 iterations, and pop out an message:

86%|████████▌ | 43/50 [5:39:26<55:15, 473.64s/trial, best loss: 0.22394105399688446]
Py4JJavaError: An error occurred while calling o46084.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 64 in stage 1606.0 failed 4 times, most recent failure: Lost task 64.3 in stage 1606.0 (TID 80923) (10.189.98.69 executor 195): java.net.ConnectException: Connection refused (Connection refused)

2.tried lightgbm with hyperopt and CrossValidator
there are not progress bar after using CrossValidator, if I limit the data size into 1000,
I succeed if the data is limited into 1000, but failed to feed the whole data. Even in the 1000 rows of data, the speed is extremly slow and Databricks doesn't show progress bar, which means it may not utilize parallel computing. I tried batches, autoScaling, numTasks and dynamicAllocation, also barrier. But all of those don't work for me.

error message:

Py4JJavaError: An error occurred while calling o1009.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1619) (10.189.96.80 executor 3): java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:613)
	at java.net.Socket.connect(Socket.java:561)
	at java.net.Socket.<init>(Socket.java:457)
	at java.net.Socket.<init>(Socket.java:234)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
	at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
	at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
	at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
	at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:938)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:938)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:102)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4018)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:4016)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3930)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3917)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3917)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1766)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1749)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1749)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4277)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4179)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4165)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:55)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1412)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1400)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3157)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:303)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:299)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$collect$1(Collector.scala:384)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:381)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:122)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:131)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:94)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:90)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:78)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:555)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:546)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:563)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:401)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:400)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:319)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:572)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:569)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3844)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4816)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3811)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4807)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1177)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4805)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$10(SQLExecution.scala:462)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:800)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:334)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:205)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:737)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4805)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3811)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$5(LightGBMBase.scala:56)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:48)
	at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:163)
	at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:160)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
	at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit(SynapseMLLogging.scala:153)
	at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit$(SynapseMLLogging.scala:152)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logFit(LightGBMClassifier.scala:27)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:613)
	at java.net.Socket.connect(Socket.java:561)
	at java.net.Socket.<init>(Socket.java:457)
	at java.net.Socket.<init>(Socket.java:234)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:133)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
	at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
	at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
	at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
	at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
	at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
	at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:226)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:938)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:938)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
	at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:413)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:377)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:211)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:199)
	at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:161)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:155)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:102)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:1042)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1045)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:932)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
File <command-3334781331052689>, line 39
     25 evaluator = BinaryClassificationEvaluator(
     26   labelCol="conversion_flag",
     27   rawPredictionCol="rawPrediction",
     28   metricName="areaUnderROC"
     29 )
     31 cv = CrossValidator(
     32   estimator=lgbm,
     33   estimatorParamMaps=paramGrid,
   (...)
     36   parallelism=8 # parallel
     37 )
---> 39 cv_model = cv.fit(train_assembler)
     40 best_model = cv_model.bestModel
     41 predictions = best_model.transform(val_assembler)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Code to reproduce issue

lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_assembler = lgbm_assembler.transform(train_data)
val_assembler = lgbm_assembler.transform(val_data)

lgbm = LightGBMClassifier(
  objective="binary",
  featuresCol="features",
  labelCol="conversion_flag",
  numIterations=600,
  isUnbalance=True,
  isProvideTrainingMetric=True,
  metric="auc",
  numTasks=4,
  numBatches=5
  )


# grid search
paramGrid = ParamGridBuilder() \
  .addGrid(lgbm.numLeaves, [31, 63]) \
  .addGrid(lgbm.maxDepth, [8, 10]) \
  .addGrid(lgbm.learningRate, [0.1, 0.01]) \
  .build()

evaluator = BinaryClassificationEvaluator(
  labelCol="conversion_flag",
  rawPredictionCol="rawPrediction",
  metricName="areaUnderROC"
)

cv = CrossValidator(
  estimator=lgbm,
  estimatorParamMaps=paramGrid,
  evaluator=evaluator,
  numFolds=5, # 5-fold CV
  parallelism=4 # parallel
)

cv_model = cv.fit(train_assembler)
best_model = cv_model.bestModel
predictions = best_model.transform(val_assembler)

Other info / logs

No response

What component(s) does this bug affect?

  • area/cognitive: Cognitive project
  • area/core: Core project
  • area/deep-learning: DeepLearning project
  • area/lightgbm: Lightgbm project
  • area/opencv: Opencv project
  • area/vw: VW project
  • area/website: Website
  • area/build: Project build system
  • area/notebooks: Samples under notebooks folder
  • area/docker: Docker usage
  • area/models: models related issue

What language(s) does this bug affect?

  • language/scala: Scala source code
  • language/python: Pyspark APIs
  • language/r: R APIs
  • language/csharp: .NET APIs
  • language/new: Proposals for new client languages

What integration(s) does this bug affect?

  • integrations/synapse: Azure Synapse integrations
  • integrations/azureml: Azure ML integrations
  • integrations/databricks: Databricks integrations
@stupidoge stupidoge added the bug label Dec 2, 2024
@github-actions github-actions bot added the triage label Dec 2, 2024
@stupidoge stupidoge changed the title [BUG] [BUG] lightgbm and CrossValidator are not compatible Dec 2, 2024
@stupidoge
Copy link
Author

@mhamilton723 @memoryz @mhamilton723 @dylanw-oss @svotaw @imatiach-msft
Hi Team, anyone have a chance to check it? I believe connection error related issue has been posted several times, but they all did not be fixed. Also, cross-validation is so important for training a model, only utilizing some parameters on same train/validation dataset will cause over-fitting issue.

Thanks again if you have time to assign this task and give me some suggestions. I represent those users have same issues to highly appreciate your time and effort!

@stupidoge
Copy link
Author

by the way, the dataset is around 7 million, and 700 more features. The data type are numerical and VectorUDT() which is transferred from FeatureHash. All null value and error data is excluded, because I could run our lightgbmClassifier successfully.

from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator

feature_cols = [c for c in train_data.columns if c != "conversion_flag"]
lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

lgbm = LightGBMClassifier(
    objective="binary",
    featuresCol="features",
    labelCol="conversion_flag",
    numIterations=500,
    isUnbalance=True,
    isProvideTrainingMetric=True,
    metric="auc",
    verbosity=1
)


# lgbm.setPassThroughArgs("print_every_n_iterations=5")
pipeline = Pipeline(stages=[lgbm_assembler, lgbm])
model = pipeline.fit(train_data)
predictions = model.transform(val_data)


evaluator_auc = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_roc = evaluator_auc.evaluate(predictions)

evaluator_pr = BinaryClassificationEvaluator(labelCol="conversion_flag", rawPredictionCol="rawPrediction", metricName="areaUnderPR") 
auc_pr = evaluator_pr.evaluate(predictions)

print(f"AUC-ROC : {auc_roc}")
print(f"AUC-PR : {auc_pr}")

@stupidoge
Copy link
Author

Finally succeed by using enclosed structure, but have a low processing time and don't show progress bar on Databricks. Could anyone try to reproduce this bugs? I believe cross-validation is important to avoid over-fitting. Thank you so much if there is anyone could help me out.

# what is setExecutionMode = straming means?
# could this be changed into numBatches to save time?
# it seems it will take a long time to start databricks
# is this maintain the same case whne you try other pysaprk.ml models?
from pyspark.sql.functions import lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# vectorize features
hash_cols = [c for c in train_data.columns if c.startswith('hash_')]
feature_cols = [c for c in train_data.columns if c != "conversion_flag"]
# feature_cols = [c for c in train_data.columns if c != "conversion_flag"]

lgbm_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

lgbm = LightGBMClassifier(
  objective="binary",
  featuresCol="features",
  labelCol="conversion_flag",
  numIterations=600,
  isUnbalance=True,
  useBarrierExecutionMode=True,
  dataTransferMode='streaming',
  earlyStoppingRound=30
  )


# grid search
paramGrid = ParamGridBuilder() \
  .addGrid(lgbm.numLeaves, [31, 63]) \
  .addGrid(lgbm.maxDepth, [8, 10]) \
  .addGrid(lgbm.learningRate, [0.1, 0.01]) \
  .build()

evaluator = BinaryClassificationEvaluator(
  labelCol="conversion_flag",
  rawPredictionCol="rawPrediction",
  metricName="areaUnderROC"
)

cv = CrossValidator(
  estimator=lgbm,
  estimatorParamMaps=paramGrid,
  evaluator=evaluator,
  numFolds=5, # 5-fold CV
  parallelism=4
  )


pipeline = Pipeline(stages=[lgbm_assembler, cv])
cv_model = pipeline.fit(train_data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant