diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index f00d47535d972..fab5e80dd0e69 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -117,6 +117,7 @@ object LogKey extends Enumeration { val ESTIMATOR_PARAMETER_MAP = Value val EVENT_LOOP = Value val EVENT_QUEUE = Value + val EXCEPTION = Value val EXECUTE_INFO = Value val EXECUTE_KEY = Value val EXECUTION_PLAN_LEAVES = Value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 8d1d500a8ddb9..438ad928a7aaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -358,8 +358,10 @@ case class AdaptiveSparkPlanExec( val newCost = costEvaluator.evaluateCost(newPhysicalPlan) if (newCost < origCost || (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) { - logOnLevel("Plan changed:\n" + - sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n")) + val plans = + sideBySide(currentPhysicalPlan.treeString, newPhysicalPlan.treeString).mkString("\n") + logOnLevel(log"Plan changed:\n" + + log"${MDC(QUERY_PLAN, plans)}") cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan @@ -389,7 +391,7 @@ case class AdaptiveSparkPlanExec( if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) } - logOnLevel(s"Final plan:\n$currentPhysicalPlan") + logOnLevel(log"Final plan:\n${MDC(QUERY_PLAN, currentPhysicalPlan)}") } override def executeCollect(): Array[InternalRow] = { @@ -742,7 +744,7 @@ case class AdaptiveSparkPlanExec( Some((finalPlan, optimized)) } catch { case e: InvalidAQEPlanException[_] => - logOnLevel(s"Re-optimize - ${e.getMessage()}:\n${e.plan}") + logOnLevel(log"Re-optimize - ${MDC(EXCEPTION, e.getMessage())}:\n${e.plan}") None } }